From d01e444c90cd221fe9b9729488ba5849da1f4bcf Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 9 Jun 2021 09:58:00 +1000 Subject: [PATCH 1/9] Import anyhow::Result instead of fully-qualifying it --- swap/src/network/tor_transport.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/swap/src/network/tor_transport.rs b/swap/src/network/tor_transport.rs index 772470af..a2fc0b56 100644 --- a/swap/src/network/tor_transport.rs +++ b/swap/src/network/tor_transport.rs @@ -1,4 +1,4 @@ -use anyhow::anyhow; +use anyhow::{anyhow, Result}; use data_encoding::BASE32; use futures::future::Ready; use futures::prelude::*; @@ -76,7 +76,7 @@ impl Transport for TorTcpConfig { /// Tor expects an address format of ADDR:PORT. /// This helper function tries to convert the provided multi-address into this /// format. None is returned if an unsupported protocol was provided. -fn to_address_string(multi: Multiaddr) -> anyhow::Result { +fn to_address_string(multi: Multiaddr) -> Result { let mut protocols = multi.iter(); let address_string = match protocols.next() { // if it is an Onion address, we have all we need and can return From 97a09807dde719e71c852506a16d27fce247430b Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 9 Jun 2021 11:19:50 +1000 Subject: [PATCH 2/9] Use `BoxFuture` type alias to avoid clippy's complexity nag --- swap/src/network/tor_transport.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/swap/src/network/tor_transport.rs b/swap/src/network/tor_transport.rs index a2fc0b56..ee4eb3d2 100644 --- a/swap/src/network/tor_transport.rs +++ b/swap/src/network/tor_transport.rs @@ -1,7 +1,6 @@ use anyhow::{anyhow, Result}; use data_encoding::BASE32; -use futures::future::Ready; -use futures::prelude::*; +use futures::future::{BoxFuture, Ready}; use libp2p::core::multiaddr::{Multiaddr, Protocol}; use libp2p::core::transport::TransportError; use libp2p::core::Transport; @@ -9,7 +8,6 @@ use libp2p::tcp::tokio::{Tcp, TcpStream}; use libp2p::tcp::{GenTcpConfig, TcpListenStream, TokioTcpConfig}; use std::io; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; -use std::pin::Pin; use tokio_socks::tcp::Socks5Stream; use tokio_socks::IntoTargetAddr; @@ -35,8 +33,7 @@ impl Transport for TorTcpConfig { type Error = io::Error; type Listener = TcpListenStream; type ListenerUpgrade = Ready>; - #[allow(clippy::type_complexity)] - type Dial = Pin> + Send>>; + type Dial = BoxFuture<'static, Result>; fn listen_on(self, addr: Multiaddr) -> Result> { self.inner.listen_on(addr) From ea0fd1eb53aa544a680fe2368cbfe90be4963a8f Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 9 Jun 2021 11:26:07 +1000 Subject: [PATCH 3/9] Inline functions to reduce complexity induced by indirection --- swap/src/network/tor_transport.rs | 36 +++++++++++-------------------- 1 file changed, 13 insertions(+), 23 deletions(-) diff --git a/swap/src/network/tor_transport.rs b/swap/src/network/tor_transport.rs index ee4eb3d2..79d16618 100644 --- a/swap/src/network/tor_transport.rs +++ b/swap/src/network/tor_transport.rs @@ -1,6 +1,6 @@ use anyhow::{anyhow, Result}; use data_encoding::BASE32; -use futures::future::{BoxFuture, Ready}; +use futures::future::{BoxFuture, FutureExt, Ready}; use libp2p::core::multiaddr::{Multiaddr, Protocol}; use libp2p::core::transport::TransportError; use libp2p::core::Transport; @@ -9,7 +9,6 @@ use libp2p::tcp::{GenTcpConfig, TcpListenStream, TokioTcpConfig}; use std::io; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use tokio_socks::tcp::Socks5Stream; -use tokio_socks::IntoTargetAddr; /// Represents the configuration for a Tor transport for libp2p. #[derive(Clone)] @@ -42,19 +41,20 @@ impl Transport for TorTcpConfig { // dials via Tor's socks5 proxy if configured and if the provided address is an // onion address. or it falls back to Tcp dialling fn dial(self, addr: Multiaddr) -> Result> { - async fn do_tor_dial(socks_port: u16, dest: String) -> Result { - tracing::trace!("Connecting through Tor proxy to address: {}", dest); - let stream = connect_to_socks_proxy(dest, socks_port) - .await - .map_err(|e| io::Error::new(io::ErrorKind::ConnectionRefused, e))?; - tracing::trace!("Connection through Tor established"); - Ok(stream) - } - match to_address_string(addr.clone()) { - Ok(tor_address_string) => { - Ok(Box::pin(do_tor_dial(self.socks_port, tor_address_string))) + Ok(tor_address_string) => Ok(async move { + tracing::trace!("Connecting through Tor proxy to address: {}", addr); + + let sock = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, self.socks_port)); + let stream = Socks5Stream::connect(sock, tor_address_string) + .await + .map_err(|e| io::Error::new(io::ErrorKind::ConnectionRefused, e))?; + + tracing::trace!("Connection through Tor established"); + + Ok(TcpStream(stream.into_inner())) } + .boxed()), Err(error) => { tracing::warn!( address = %addr, @@ -111,16 +111,6 @@ fn to_address_string(multi: Multiaddr) -> Result { } } -/// Connect to the SOCKS5 proxy socket. -async fn connect_to_socks_proxy<'a>( - dest: impl IntoTargetAddr<'a>, - port: u16, -) -> Result { - let sock = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port)); - let stream = Socks5Stream::connect(sock, dest).await?; - Ok(TcpStream(stream.into_inner())) -} - #[cfg(test)] pub mod test { use crate::network::tor_transport::to_address_string; From 8bd6c9dcfc1619867c3db9054ce2b8df01783b5d Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 9 Jun 2021 11:30:48 +1000 Subject: [PATCH 4/9] Simplify construction of Socks5Stream --- swap/src/network/tor_transport.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/swap/src/network/tor_transport.rs b/swap/src/network/tor_transport.rs index 79d16618..e6154ef3 100644 --- a/swap/src/network/tor_transport.rs +++ b/swap/src/network/tor_transport.rs @@ -7,7 +7,7 @@ use libp2p::core::Transport; use libp2p::tcp::tokio::{Tcp, TcpStream}; use libp2p::tcp::{GenTcpConfig, TcpListenStream, TokioTcpConfig}; use std::io; -use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; +use std::net::Ipv4Addr; use tokio_socks::tcp::Socks5Stream; /// Represents the configuration for a Tor transport for libp2p. @@ -45,10 +45,12 @@ impl Transport for TorTcpConfig { Ok(tor_address_string) => Ok(async move { tracing::trace!("Connecting through Tor proxy to address: {}", addr); - let sock = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, self.socks_port)); - let stream = Socks5Stream::connect(sock, tor_address_string) - .await - .map_err(|e| io::Error::new(io::ErrorKind::ConnectionRefused, e))?; + let stream = Socks5Stream::connect( + (Ipv4Addr::LOCALHOST, self.socks_port), + tor_address_string, + ) + .await + .map_err(|e| io::Error::new(io::ErrorKind::ConnectionRefused, e))?; tracing::trace!("Connection through Tor established"); From d19231d811d896da3b238a5d8b8d76fa7b9e276a Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 9 Jun 2021 11:43:26 +1000 Subject: [PATCH 5/9] Refactor Tor transport to be dial-only Libp2p's transports are meant to be composed. Hence, any form of fallback should be implemented by emitting `MultiaddrNotSupported` from the `listen` and `dial` functions. This allows us to completely remove the tcp transport from the tor transport. --- swap/src/network/tor_transport.rs | 140 +++++++++++++----------------- swap/src/network/transport.rs | 13 +-- 2 files changed, 66 insertions(+), 87 deletions(-) diff --git a/swap/src/network/tor_transport.rs b/swap/src/network/tor_transport.rs index e6154ef3..d6b82e5b 100644 --- a/swap/src/network/tor_transport.rs +++ b/swap/src/network/tor_transport.rs @@ -1,33 +1,28 @@ -use anyhow::{anyhow, Result}; +use anyhow::Result; use data_encoding::BASE32; use futures::future::{BoxFuture, FutureExt, Ready}; use libp2p::core::multiaddr::{Multiaddr, Protocol}; use libp2p::core::transport::TransportError; use libp2p::core::Transport; use libp2p::tcp::tokio::{Tcp, TcpStream}; -use libp2p::tcp::{GenTcpConfig, TcpListenStream, TokioTcpConfig}; +use libp2p::tcp::TcpListenStream; use std::io; use std::net::Ipv4Addr; use tokio_socks::tcp::Socks5Stream; -/// Represents the configuration for a Tor transport for libp2p. +/// A [`Transport`] that can dial onion addresses through a running Tor daemon. #[derive(Clone)] -pub struct TorTcpConfig { - inner: GenTcpConfig, - /// Tor SOCKS5 proxy port number. +pub struct TorDialOnlyTransport { socks_port: u16, } -impl TorTcpConfig { - pub fn new(tcp: TokioTcpConfig, socks_port: u16) -> Self { - Self { - inner: tcp, - socks_port, - } +impl TorDialOnlyTransport { + pub fn new(socks_port: u16) -> Self { + Self { socks_port } } } -impl Transport for TorTcpConfig { +impl Transport for TorDialOnlyTransport { type Output = TcpStream; type Error = io::Error; type Listener = TcpListenStream; @@ -35,48 +30,41 @@ impl Transport for TorTcpConfig { type Dial = BoxFuture<'static, Result>; fn listen_on(self, addr: Multiaddr) -> Result> { - self.inner.listen_on(addr) + Err(TransportError::MultiaddrNotSupported(addr)) } - // dials via Tor's socks5 proxy if configured and if the provided address is an - // onion address. or it falls back to Tcp dialling fn dial(self, addr: Multiaddr) -> Result> { - match to_address_string(addr.clone()) { - Ok(tor_address_string) => Ok(async move { - tracing::trace!("Connecting through Tor proxy to address: {}", addr); + let tor_address_string = fmt_as_address_string(addr.clone())?; - let stream = Socks5Stream::connect( - (Ipv4Addr::LOCALHOST, self.socks_port), - tor_address_string, - ) - .await - .map_err(|e| io::Error::new(io::ErrorKind::ConnectionRefused, e))?; + let dial_future = async move { + tracing::trace!("Connecting through Tor proxy to address: {}", addr); - tracing::trace!("Connection through Tor established"); + let stream = + Socks5Stream::connect((Ipv4Addr::LOCALHOST, self.socks_port), tor_address_string) + .await + .map_err(|e| io::Error::new(io::ErrorKind::ConnectionRefused, e))?; - Ok(TcpStream(stream.into_inner())) - } - .boxed()), - Err(error) => { - tracing::warn!( - address = %addr, - "Address could not be formatted. Dialling via clear net. Error {:#}", error, - ); - self.inner.dial(addr) - } - } + tracing::trace!("Connection through Tor established"); + + Ok(TcpStream(stream.into_inner())) + }; + + Ok(dial_future.boxed()) } - fn address_translation(&self, listen: &Multiaddr, observed: &Multiaddr) -> Option { - self.inner.address_translation(listen, observed) + fn address_translation(&self, _: &Multiaddr, _: &Multiaddr) -> Option { + None } } -/// Tor expects an address format of ADDR:PORT. -/// This helper function tries to convert the provided multi-address into this -/// format. None is returned if an unsupported protocol was provided. -fn to_address_string(multi: Multiaddr) -> Result { +/// Formats the given [`Multiaddr`] as an "address" string. +/// +/// For our purposes, we define an address as {HOST}(.{TLD}):{PORT}. This format +/// is what is compatible with the Tor daemon and allows us to route traffic +/// through Tor. +fn fmt_as_address_string(multi: Multiaddr) -> Result> { let mut protocols = multi.iter(); + let address_string = match protocols.next() { // if it is an Onion address, we have all we need and can return Some(Protocol::Onion3(addr)) => { @@ -87,43 +75,33 @@ fn to_address_string(multi: Multiaddr) -> Result { )) } // Deal with non-onion addresses - Some(Protocol::Ip4(addr)) => Some(format!("{}", addr)), - Some(Protocol::Ip6(addr)) => Some(format!("{}", addr)), - Some(Protocol::Dns(addr)) => Some(format!("{}", addr)), - Some(Protocol::Dns4(addr)) => Some(format!("{}", addr)), - _ => None, - } - .ok_or_else(|| { - anyhow!( - "Could not format address {}. Please consider reporting this issue. ", - multi - ) - })?; - - let port_string = match protocols.next() { - Some(Protocol::Tcp(port)) => Some(format!("{}", port)), - Some(Protocol::Udp(port)) => Some(format!("{}", port)), - _ => None, + Some(Protocol::Ip4(addr)) => format!("{}", addr), + Some(Protocol::Ip6(addr)) => format!("{}", addr), + Some(Protocol::Dns(addr)) => format!("{}", addr), + Some(Protocol::Dns4(addr)) => format!("{}", addr), + _ => return Err(TransportError::MultiaddrNotSupported(multi)), }; - if let Some(port) = port_string { - Ok(format!("{}:{}", address_string, port)) - } else { - Ok(address_string) - } + let port = match protocols.next() { + Some(Protocol::Tcp(port)) => port, + Some(Protocol::Udp(port)) => port, + _ => return Err(TransportError::MultiaddrNotSupported(multi)), + }; + + Ok(format!("{}:{}", address_string, port)) } #[cfg(test)] pub mod test { - use crate::network::tor_transport::to_address_string; + use super::*; #[test] fn test_tor_address_string() { let address = "/onion3/oarchy4tamydxcitaki6bc2v4leza6v35iezmu2chg2bap63sv6f2did:1024/p2p/12D3KooWPD4uHN74SHotLN7VCH7Fm8zZgaNVymYcpeF1fpD2guc9" ; - let address_string = - to_address_string(address.parse().unwrap()).expect("To be a multi formatted address."); + let address_string = fmt_as_address_string(address.parse().unwrap()) + .expect("To be a multi formatted address."); assert_eq!( address_string, "oarchy4tamydxcitaki6bc2v4leza6v35iezmu2chg2bap63sv6f2did.onion:1024" @@ -133,55 +111,55 @@ pub mod test { #[test] fn tcp_to_address_string_should_be_some() { let address = "/ip4/127.0.0.1/tcp/7777"; - let address_string = - to_address_string(address.parse().unwrap()).expect("To be a formatted multi address. "); + let address_string = fmt_as_address_string(address.parse().unwrap()) + .expect("To be a formatted multi address. "); assert_eq!(address_string, "127.0.0.1:7777"); } #[test] fn ip6_to_address_string_should_be_some() { let address = "/ip6/2001:db8:85a3:8d3:1319:8a2e:370:7348/tcp/7777"; - let address_string = - to_address_string(address.parse().unwrap()).expect("To be a formatted multi address. "); + let address_string = fmt_as_address_string(address.parse().unwrap()) + .expect("To be a formatted multi address. "); assert_eq!(address_string, "2001:db8:85a3:8d3:1319:8a2e:370:7348:7777"); } #[test] fn udp_to_address_string_should_be_some() { let address = "/ip4/127.0.0.1/udp/7777"; - let address_string = - to_address_string(address.parse().unwrap()).expect("To be a formatted multi address. "); + let address_string = fmt_as_address_string(address.parse().unwrap()) + .expect("To be a formatted multi address. "); assert_eq!(address_string, "127.0.0.1:7777"); } #[test] fn ws_to_address_string_should_be_some() { let address = "/ip4/127.0.0.1/tcp/7777/ws"; - let address_string = - to_address_string(address.parse().unwrap()).expect("To be a formatted multi address. "); + let address_string = fmt_as_address_string(address.parse().unwrap()) + .expect("To be a formatted multi address. "); assert_eq!(address_string, "127.0.0.1:7777"); } #[test] fn dns4_to_address_string_should_be_some() { let address = "/dns4/randomdomain.com/tcp/7777"; - let address_string = - to_address_string(address.parse().unwrap()).expect("To be a formatted multi address. "); + let address_string = fmt_as_address_string(address.parse().unwrap()) + .expect("To be a formatted multi address. "); assert_eq!(address_string, "randomdomain.com:7777"); } #[test] fn dns_to_address_string_should_be_some() { let address = "/dns/randomdomain.com/tcp/7777"; - let address_string = - to_address_string(address.parse().unwrap()).expect("To be a formatted multi address. "); + let address_string = fmt_as_address_string(address.parse().unwrap()) + .expect("To be a formatted multi address. "); assert_eq!(address_string, "randomdomain.com:7777"); } #[test] fn dnsaddr_to_address_string_should_be_none() { let address = "/dnsaddr/randomdomain.com"; - let address_string = to_address_string(address.parse().unwrap()).ok(); + let address_string = fmt_as_address_string(address.parse().unwrap()).ok(); assert_eq!(address_string, None); } } diff --git a/swap/src/network/transport.rs b/swap/src/network/transport.rs index b5f01f85..c4fa929d 100644 --- a/swap/src/network/transport.rs +++ b/swap/src/network/transport.rs @@ -1,4 +1,4 @@ -use crate::network::tor_transport::TorTcpConfig; +use crate::network::tor_transport::TorDialOnlyTransport; use anyhow::Result; use libp2p::core::muxing::StreamMuxerBox; use libp2p::core::transport::Boxed; @@ -51,12 +51,13 @@ pub fn build_tor(id_keys: &identity::Keypair, tor_socks5_port: u16) -> Result Date: Wed, 9 Jun 2021 12:03:06 +1000 Subject: [PATCH 6/9] Rename swarm constructors to be per tool instead of per role --- swap/src/bin/asb.rs | 2 +- swap/src/bin/swap.rs | 4 ++-- swap/src/network/swarm.rs | 4 ++-- swap/tests/harness/mod.rs | 4 ++-- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/swap/src/bin/asb.rs b/swap/src/bin/asb.rs index eea3bcce..f8212ede 100644 --- a/swap/src/bin/asb.rs +++ b/swap/src/bin/asb.rs @@ -147,7 +147,7 @@ async fn main() -> Result<()> { let current_balance = monero_wallet.get_balance().await?; let lock_fee = monero_wallet.static_tx_fee_estimate(); let kraken_rate = KrakenRate::new(config.maker.ask_spread, kraken_price_updates); - let mut swarm = swarm::alice( + let mut swarm = swarm::asb( &seed, current_balance, lock_fee, diff --git a/swap/src/bin/swap.rs b/swap/src/bin/swap.rs index 1db5b150..23190493 100644 --- a/swap/src/bin/swap.rs +++ b/swap/src/bin/swap.rs @@ -85,7 +85,7 @@ async fn main() -> Result<()> { init_monero_wallet(data_dir, monero_daemon_address, env_config).await?; let bitcoin_wallet = Arc::new(bitcoin_wallet); - let mut swarm = swarm::bob(&seed, seller_peer_id, tor_socks5_port).await?; + let mut swarm = swarm::cli(&seed, seller_peer_id, tor_socks5_port).await?; swarm .behaviour_mut() .add_address(seller_peer_id, seller_addr); @@ -185,7 +185,7 @@ async fn main() -> Result<()> { let seller_peer_id = db.get_peer_id(swap_id)?; - let mut swarm = swarm::bob(&seed, seller_peer_id, tor_socks5_port).await?; + let mut swarm = swarm::cli(&seed, seller_peer_id, tor_socks5_port).await?; let our_peer_id = swarm.local_peer_id(); tracing::debug!(peer_id = %our_peer_id, "Initializing network module"); swarm diff --git a/swap/src/network/swarm.rs b/swap/src/network/swarm.rs index 5a27a8c0..2e2a8e1d 100644 --- a/swap/src/network/swarm.rs +++ b/swap/src/network/swarm.rs @@ -9,7 +9,7 @@ use libp2p::{PeerId, Swarm}; use std::fmt::Debug; #[allow(clippy::too_many_arguments)] -pub fn alice( +pub fn asb( seed: &Seed, balance: monero::Amount, lock_fee: monero::Amount, @@ -36,7 +36,7 @@ where ) } -pub async fn bob( +pub async fn cli( seed: &Seed, alice: PeerId, tor_socks5_port: u16, diff --git a/swap/tests/harness/mod.rs b/swap/tests/harness/mod.rs index 3807cb94..3d266bd5 100644 --- a/swap/tests/harness/mod.rs +++ b/swap/tests/harness/mod.rs @@ -231,7 +231,7 @@ async fn start_alice( let latest_rate = FixedRate::default(); let resume_only = false; - let mut swarm = swarm::alice( + let mut swarm = swarm::asb( &seed, current_balance, lock_fee, @@ -449,7 +449,7 @@ impl BobParams { ) -> Result<(bob::EventLoop, bob::EventLoopHandle)> { let tor_socks5_port = get_port() .expect("We don't care about Tor in the tests so we get a free port to disable it."); - let mut swarm = swarm::bob(&self.seed, self.alice_peer_id, tor_socks5_port).await?; + let mut swarm = swarm::cli(&self.seed, self.alice_peer_id, tor_socks5_port).await?; swarm .behaviour_mut() .add_address(self.alice_peer_id, self.alice_address.clone()); From 8a30ef725c3084d010430305f3ee6f0b3f714bfa Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 9 Jun 2021 12:13:29 +1000 Subject: [PATCH 7/9] Refactor transports to construct them specific for each application Instead of splitting up the transports into capabilities, we compose them directly for each application. This allows us to remove the websocket transport for the CLI which is really only needed for the ASB to allow retrieval of quotes via the browser. --- CHANGELOG.md | 6 +++ swap/src/asb.rs | 1 + swap/src/asb/transport.rs | 19 +++++++++ swap/src/cli.rs | 1 + swap/src/cli/transport.rs | 32 ++++++++++++++ swap/src/network/swarm.rs | 77 +++++++++++++--------------------- swap/src/network/transport.rs | 79 ++++++++++------------------------- 7 files changed, 111 insertions(+), 104 deletions(-) create mode 100644 swap/src/asb/transport.rs create mode 100644 swap/src/cli/transport.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 03c055b9..39f1d5fc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Printing the deposit address to the terminal as a QR code. To not break automated scripts or integrations with other software, this behaviour is disabled if `--json` is passed to the application. +### Removed + +- The websocket transport from the CLI. + Websockets were only ever intended to be used for the ASB side to allow websites to retrieve quotes. + The CLI can use regular TCP connections and having both - TCP and websockets - causes problems and unnecessary overhead. + ## [0.7.0] - 2021-05-28 ### Fixed diff --git a/swap/src/asb.rs b/swap/src/asb.rs index 222046c1..47ceb072 100644 --- a/swap/src/asb.rs +++ b/swap/src/asb.rs @@ -2,5 +2,6 @@ pub mod command; pub mod config; mod rate; pub mod tracing; +pub mod transport; pub use rate::Rate; diff --git a/swap/src/asb/transport.rs b/swap/src/asb/transport.rs new file mode 100644 index 00000000..a552a8f0 --- /dev/null +++ b/swap/src/asb/transport.rs @@ -0,0 +1,19 @@ +use crate::network::transport::authenticate_and_multiplex; +use anyhow::Result; +use libp2p::core::muxing::StreamMuxerBox; +use libp2p::core::transport::Boxed; +use libp2p::dns::TokioDnsConfig; +use libp2p::tcp::TokioTcpConfig; +use libp2p::websocket::WsConfig; +use libp2p::{identity, PeerId, Transport}; + +/// Creates the libp2p transport for the ASB. +pub fn new(identity: &identity::Keypair) -> Result> { + let tcp = TokioTcpConfig::new().nodelay(true); + let tcp_with_dns = TokioDnsConfig::system(tcp)?; + let websocket_with_dns = WsConfig::new(tcp_with_dns.clone()); + + let transport = tcp_with_dns.or_transport(websocket_with_dns).boxed(); + + authenticate_and_multiplex(transport, identity) +} diff --git a/swap/src/cli.rs b/swap/src/cli.rs index e28b6580..7962efd8 100644 --- a/swap/src/cli.rs +++ b/swap/src/cli.rs @@ -1,2 +1,3 @@ pub mod command; pub mod tracing; +pub mod transport; diff --git a/swap/src/cli/transport.rs b/swap/src/cli/transport.rs new file mode 100644 index 00000000..68a5cd1a --- /dev/null +++ b/swap/src/cli/transport.rs @@ -0,0 +1,32 @@ +use crate::network::tor_transport::TorDialOnlyTransport; +use crate::network::transport::authenticate_and_multiplex; +use anyhow::Result; +use libp2p::core::muxing::StreamMuxerBox; +use libp2p::core::transport::{Boxed, OptionalTransport}; +use libp2p::dns::TokioDnsConfig; +use libp2p::tcp::TokioTcpConfig; +use libp2p::{identity, PeerId, Transport}; + +/// Creates the libp2p transport for the swap CLI. +/// +/// The CLI's transport needs the following capabilities: +/// - Establish TCP connections +/// - Resolve DNS entries +/// - Dial onion-addresses through a running Tor daemon by connecting to the +/// socks5 port. If the port is not given, we will fall back to the regular +/// TCP transport. +pub fn new( + identity: &identity::Keypair, + maybe_tor_socks5_port: Option, +) -> Result> { + let tcp = TokioTcpConfig::new().nodelay(true); + let tcp_with_dns = TokioDnsConfig::system(tcp)?; + let maybe_tor_transport = match maybe_tor_socks5_port { + Some(port) => OptionalTransport::some(TorDialOnlyTransport::new(port)), + None => OptionalTransport::none(), + }; + + let transport = maybe_tor_transport.or_transport(tcp_with_dns).boxed(); + + authenticate_and_multiplex(transport, identity) +} diff --git a/swap/src/network/swarm.rs b/swap/src/network/swarm.rs index 2e2a8e1d..37e9f219 100644 --- a/swap/src/network/swarm.rs +++ b/swap/src/network/swarm.rs @@ -1,10 +1,9 @@ -use crate::network::transport; use crate::protocol::alice::event_loop::LatestRate; use crate::protocol::{alice, bob}; use crate::seed::Seed; -use crate::{env, monero, tor}; +use crate::{asb, cli, env, monero, tor}; use anyhow::Result; -use libp2p::swarm::{NetworkBehaviour, SwarmBuilder}; +use libp2p::swarm::SwarmBuilder; use libp2p::{PeerId, Swarm}; use std::fmt::Debug; @@ -22,18 +21,27 @@ pub fn asb( where LR: LatestRate + Send + 'static + Debug, { - with_clear_net( - seed, - alice::Behaviour::new( - balance, - lock_fee, - min_buy, - max_buy, - latest_rate, - resume_only, - env_config, - ), - ) + let behaviour = alice::Behaviour::new( + balance, + lock_fee, + min_buy, + max_buy, + latest_rate, + resume_only, + env_config, + ); + + let identity = seed.derive_libp2p_identity(); + let transport = asb::transport::new(&identity)?; + let peer_id = identity.public().into_peer_id(); + + let swarm = SwarmBuilder::new(transport, behaviour, peer_id) + .executor(Box::new(|f| { + tokio::spawn(f); + })) + .build(); + + Ok(swarm) } pub async fn cli( @@ -41,41 +49,16 @@ pub async fn cli( alice: PeerId, tor_socks5_port: u16, ) -> Result> { - let client = tor::Client::new(tor_socks5_port); - if client.assert_tor_running().await.is_ok() { - return with_tor(seed, bob::Behaviour::new(alice), tor_socks5_port).await; - } - with_clear_net(seed, bob::Behaviour::new(alice)) -} + let maybe_tor_socks5_port = match tor::Client::new(tor_socks5_port).assert_tor_running().await { + Ok(()) => Some(tor_socks5_port), + Err(_) => None, + }; + + let behaviour = bob::Behaviour::new(alice); -fn with_clear_net(seed: &Seed, behaviour: B) -> Result> -where - B: NetworkBehaviour, -{ - tracing::info!("All connections will go through clear net"); let identity = seed.derive_libp2p_identity(); - let transport = transport::build_clear_net(&identity)?; + let transport = cli::transport::new(&identity, maybe_tor_socks5_port)?; let peer_id = identity.public().into_peer_id(); - tracing::debug!(%peer_id, "Our peer-id"); - - let swarm = SwarmBuilder::new(transport, behaviour, peer_id) - .executor(Box::new(|f| { - tokio::spawn(f); - })) - .build(); - - Ok(swarm) -} - -async fn with_tor(seed: &Seed, behaviour: B, tor_socks5_port: u16) -> Result> -where - B: NetworkBehaviour, -{ - tracing::info!("All connections will go through Tor socks5 proxy"); - let identity = seed.derive_libp2p_identity(); - let transport = transport::build_tor(&identity, tor_socks5_port)?; - let peer_id = identity.public().into_peer_id(); - tracing::debug!(%peer_id, "Our peer-id"); let swarm = SwarmBuilder::new(transport, behaviour, peer_id) .executor(Box::new(|f| { diff --git a/swap/src/network/transport.rs b/swap/src/network/transport.rs index c4fa929d..35ac3e8c 100644 --- a/swap/src/network/transport.rs +++ b/swap/src/network/transport.rs @@ -1,74 +1,39 @@ -use crate::network::tor_transport::TorDialOnlyTransport; use anyhow::Result; +use futures::{AsyncRead, AsyncWrite}; use libp2p::core::muxing::StreamMuxerBox; use libp2p::core::transport::Boxed; use libp2p::core::upgrade::{SelectUpgrade, Version}; -use libp2p::dns::TokioDnsConfig; use libp2p::mplex::MplexConfig; use libp2p::noise::{self, NoiseConfig, X25519Spec}; -use libp2p::tcp::TokioTcpConfig; -use libp2p::websocket::WsConfig; use libp2p::{identity, yamux, PeerId, Transport}; use std::time::Duration; -/// Builds a libp2p transport with the following features: -/// - TcpConnection -/// - WebSocketConnection -/// - DNS name resolution -/// - authentication via noise -/// - multiplexing via yamux or mplex -pub fn build_clear_net(id_keys: &identity::Keypair) -> Result { - let dh_keys = noise::Keypair::::new().into_authentic(id_keys)?; - let noise = NoiseConfig::xx(dh_keys).into_authenticated(); +/// "Completes" a transport by applying the authentication and multiplexing +/// upgrades. +/// +/// Even though the actual transport technology in use might be different, for +/// two libp2p applications to be compatible, the authentication and +/// multiplexing upgrades need to be compatible. +pub fn authenticate_and_multiplex( + transport: Boxed, + identity: &identity::Keypair, +) -> Result> +where + T: AsyncRead + AsyncWrite + Unpin + Send + 'static, +{ + let auth_upgrade = { + let noise_identity = noise::Keypair::::new().into_authentic(identity)?; + NoiseConfig::xx(noise_identity).into_authenticated() + }; + let multiplex_upgrade = SelectUpgrade::new(yamux::YamuxConfig::default(), MplexConfig::new()); - let tcp = TokioTcpConfig::new().nodelay(true); - let dns = TokioDnsConfig::system(tcp)?; - let websocket = WsConfig::new(dns.clone()); - - let transport = websocket - .or_transport(dns) + let transport = transport .upgrade(Version::V1) - .authenticate(noise) - .multiplex(SelectUpgrade::new( - yamux::YamuxConfig::default(), - MplexConfig::new(), - )) + .authenticate(auth_upgrade) + .multiplex(multiplex_upgrade) .timeout(Duration::from_secs(20)) .map(|(peer, muxer), _| (peer, StreamMuxerBox::new(muxer))) .boxed(); Ok(transport) } - -/// Builds a libp2p transport with the following features: -/// - TorTcpConnection -/// - WebSocketConnection -/// - DNS name resolution -/// - authentication via noise -/// - multiplexing via yamux or mplex -pub fn build_tor(id_keys: &identity::Keypair, tor_socks5_port: u16) -> Result { - let dh_keys = noise::Keypair::::new().into_authentic(id_keys)?; - let noise = NoiseConfig::xx(dh_keys).into_authenticated(); - - let tcp = TokioTcpConfig::new().nodelay(true); - let tcp_with_dns = TokioDnsConfig::system(tcp)?; - let websocket_with_dns = WsConfig::new(tcp_with_dns.clone()); - let tor_dial_only = TorDialOnlyTransport::new(tor_socks5_port); - - let transport = tor_dial_only - .or_transport(tcp_with_dns) - .or_transport(websocket_with_dns) - .upgrade(Version::V1) - .authenticate(noise) - .multiplex(SelectUpgrade::new( - yamux::YamuxConfig::default(), - MplexConfig::new(), - )) - .timeout(Duration::from_secs(20)) - .map(|(peer, muxer), _| (peer, StreamMuxerBox::new(muxer))) - .boxed(); - - Ok(transport) -} - -pub type SwapTransport = Boxed<(PeerId, StreamMuxerBox)>; From ff8cd0ab931a75fda9e95de5e5125bdb2dcb9666 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 9 Jun 2021 14:13:04 +1000 Subject: [PATCH 8/9] Harmonise log statements between applications The refactoring of the transport initialization removed the log statement for the asb. We re-introduce this log statement in main for consistency. --- swap/src/bin/asb.rs | 4 ++-- swap/src/bin/swap.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/swap/src/bin/asb.rs b/swap/src/bin/asb.rs index f8212ede..ef590dd1 100644 --- a/swap/src/bin/asb.rs +++ b/swap/src/bin/asb.rs @@ -163,6 +163,8 @@ async fn main() -> Result<()> { .with_context(|| format!("Failed to listen on network interface {}", listen))?; } + tracing::info!(peer_id = %swarm.local_peer_id(), "Network layer initialized"); + let (event_loop, mut swap_receiver) = EventLoop::new( swarm, env_config, @@ -192,8 +194,6 @@ async fn main() -> Result<()> { } }); - info!(peer_id = %event_loop.peer_id(), "Our peer-id"); - event_loop.run().await; } Command::History => { diff --git a/swap/src/bin/swap.rs b/swap/src/bin/swap.rs index 23190493..abf98128 100644 --- a/swap/src/bin/swap.rs +++ b/swap/src/bin/swap.rs @@ -90,8 +90,8 @@ async fn main() -> Result<()> { .behaviour_mut() .add_address(seller_peer_id, seller_addr); - let our_peer_id = swarm.local_peer_id(); - tracing::debug!(peer_id = %our_peer_id, "Initializing network module"); + tracing::debug!(peer_id = %swarm.local_peer_id(), "Network layer initialized"); + let (event_loop, mut event_loop_handle) = EventLoop::new( swap_id, swarm, From 1ba186aa635bb34793a2b9ff92a82a8d6960b65a Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 9 Jun 2021 14:21:15 +1000 Subject: [PATCH 9/9] Make log message a proper statement "Listening on" is only half a sentence. --- swap/src/protocol/alice/event_loop.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index 832f658c..49f32d73 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -335,7 +335,7 @@ where } } SwarmEvent::NewListenAddr(address) => { - tracing::info!(%address, "Listening on"); + tracing::info!(%address, "New listen address detected"); } _ => {} }