558: Remove websocket transport from CLI r=thomaseizinger a=thomaseizinger

I've also incorporated some of the ideas in how to modularize the Tor integration with libp2p. For more details, please see the individual patches.

Co-authored-by: Thomas Eizinger <thomas@eizinger.io>
This commit is contained in:
bors[bot] 2021-06-21 00:50:51 +00:00 committed by GitHub
commit e50de7dc99
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 186 additions and 211 deletions

View File

@ -12,6 +12,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Printing the deposit address to the terminal as a QR code.
To not break automated scripts or integrations with other software, this behaviour is disabled if `--json` is passed to the application.
### Removed
- The websocket transport from the CLI.
Websockets were only ever intended to be used for the ASB side to allow websites to retrieve quotes.
The CLI can use regular TCP connections and having both - TCP and websockets - causes problems and unnecessary overhead.
## [0.7.0] - 2021-05-28
### Fixed

View File

@ -2,5 +2,6 @@ pub mod command;
pub mod config;
mod rate;
pub mod tracing;
pub mod transport;
pub use rate::Rate;

19
swap/src/asb/transport.rs Normal file
View File

@ -0,0 +1,19 @@
use crate::network::transport::authenticate_and_multiplex;
use anyhow::Result;
use libp2p::core::muxing::StreamMuxerBox;
use libp2p::core::transport::Boxed;
use libp2p::dns::TokioDnsConfig;
use libp2p::tcp::TokioTcpConfig;
use libp2p::websocket::WsConfig;
use libp2p::{identity, PeerId, Transport};
/// Creates the libp2p transport for the ASB.
pub fn new(identity: &identity::Keypair) -> Result<Boxed<(PeerId, StreamMuxerBox)>> {
let tcp = TokioTcpConfig::new().nodelay(true);
let tcp_with_dns = TokioDnsConfig::system(tcp)?;
let websocket_with_dns = WsConfig::new(tcp_with_dns.clone());
let transport = tcp_with_dns.or_transport(websocket_with_dns).boxed();
authenticate_and_multiplex(transport, identity)
}

View File

@ -147,7 +147,7 @@ async fn main() -> Result<()> {
let current_balance = monero_wallet.get_balance().await?;
let lock_fee = monero_wallet.static_tx_fee_estimate();
let kraken_rate = KrakenRate::new(config.maker.ask_spread, kraken_price_updates);
let mut swarm = swarm::alice(
let mut swarm = swarm::asb(
&seed,
current_balance,
lock_fee,
@ -163,6 +163,8 @@ async fn main() -> Result<()> {
.with_context(|| format!("Failed to listen on network interface {}", listen))?;
}
tracing::info!(peer_id = %swarm.local_peer_id(), "Network layer initialized");
let (event_loop, mut swap_receiver) = EventLoop::new(
swarm,
env_config,
@ -192,8 +194,6 @@ async fn main() -> Result<()> {
}
});
info!(peer_id = %event_loop.peer_id(), "Our peer-id");
event_loop.run().await;
}
Command::History => {

View File

@ -85,13 +85,13 @@ async fn main() -> Result<()> {
init_monero_wallet(data_dir, monero_daemon_address, env_config).await?;
let bitcoin_wallet = Arc::new(bitcoin_wallet);
let mut swarm = swarm::bob(&seed, seller_peer_id, tor_socks5_port).await?;
let mut swarm = swarm::cli(&seed, seller_peer_id, tor_socks5_port).await?;
swarm
.behaviour_mut()
.add_address(seller_peer_id, seller_addr);
let our_peer_id = swarm.local_peer_id();
tracing::debug!(peer_id = %our_peer_id, "Initializing network module");
tracing::debug!(peer_id = %swarm.local_peer_id(), "Network layer initialized");
let (event_loop, mut event_loop_handle) = EventLoop::new(
swap_id,
swarm,
@ -185,7 +185,7 @@ async fn main() -> Result<()> {
let seller_peer_id = db.get_peer_id(swap_id)?;
let mut swarm = swarm::bob(&seed, seller_peer_id, tor_socks5_port).await?;
let mut swarm = swarm::cli(&seed, seller_peer_id, tor_socks5_port).await?;
let our_peer_id = swarm.local_peer_id();
tracing::debug!(peer_id = %our_peer_id, "Initializing network module");
swarm

View File

@ -1,2 +1,3 @@
pub mod command;
pub mod tracing;
pub mod transport;

32
swap/src/cli/transport.rs Normal file
View File

@ -0,0 +1,32 @@
use crate::network::tor_transport::TorDialOnlyTransport;
use crate::network::transport::authenticate_and_multiplex;
use anyhow::Result;
use libp2p::core::muxing::StreamMuxerBox;
use libp2p::core::transport::{Boxed, OptionalTransport};
use libp2p::dns::TokioDnsConfig;
use libp2p::tcp::TokioTcpConfig;
use libp2p::{identity, PeerId, Transport};
/// Creates the libp2p transport for the swap CLI.
///
/// The CLI's transport needs the following capabilities:
/// - Establish TCP connections
/// - Resolve DNS entries
/// - Dial onion-addresses through a running Tor daemon by connecting to the
/// socks5 port. If the port is not given, we will fall back to the regular
/// TCP transport.
pub fn new(
identity: &identity::Keypair,
maybe_tor_socks5_port: Option<u16>,
) -> Result<Boxed<(PeerId, StreamMuxerBox)>> {
let tcp = TokioTcpConfig::new().nodelay(true);
let tcp_with_dns = TokioDnsConfig::system(tcp)?;
let maybe_tor_transport = match maybe_tor_socks5_port {
Some(port) => OptionalTransport::some(TorDialOnlyTransport::new(port)),
None => OptionalTransport::none(),
};
let transport = maybe_tor_transport.or_transport(tcp_with_dns).boxed();
authenticate_and_multiplex(transport, identity)
}

View File

@ -1,15 +1,14 @@
use crate::network::transport;
use crate::protocol::alice::event_loop::LatestRate;
use crate::protocol::{alice, bob};
use crate::seed::Seed;
use crate::{env, monero, tor};
use crate::{asb, cli, env, monero, tor};
use anyhow::Result;
use libp2p::swarm::{NetworkBehaviour, SwarmBuilder};
use libp2p::swarm::SwarmBuilder;
use libp2p::{PeerId, Swarm};
use std::fmt::Debug;
#[allow(clippy::too_many_arguments)]
pub fn alice<LR>(
pub fn asb<LR>(
seed: &Seed,
balance: monero::Amount,
lock_fee: monero::Amount,
@ -22,60 +21,44 @@ pub fn alice<LR>(
where
LR: LatestRate + Send + 'static + Debug,
{
with_clear_net(
seed,
alice::Behaviour::new(
balance,
lock_fee,
min_buy,
max_buy,
latest_rate,
resume_only,
env_config,
),
)
let behaviour = alice::Behaviour::new(
balance,
lock_fee,
min_buy,
max_buy,
latest_rate,
resume_only,
env_config,
);
let identity = seed.derive_libp2p_identity();
let transport = asb::transport::new(&identity)?;
let peer_id = identity.public().into_peer_id();
let swarm = SwarmBuilder::new(transport, behaviour, peer_id)
.executor(Box::new(|f| {
tokio::spawn(f);
}))
.build();
Ok(swarm)
}
pub async fn bob(
pub async fn cli(
seed: &Seed,
alice: PeerId,
tor_socks5_port: u16,
) -> Result<Swarm<bob::Behaviour>> {
let client = tor::Client::new(tor_socks5_port);
if client.assert_tor_running().await.is_ok() {
return with_tor(seed, bob::Behaviour::new(alice), tor_socks5_port).await;
}
with_clear_net(seed, bob::Behaviour::new(alice))
}
let maybe_tor_socks5_port = match tor::Client::new(tor_socks5_port).assert_tor_running().await {
Ok(()) => Some(tor_socks5_port),
Err(_) => None,
};
let behaviour = bob::Behaviour::new(alice);
fn with_clear_net<B>(seed: &Seed, behaviour: B) -> Result<Swarm<B>>
where
B: NetworkBehaviour,
{
tracing::info!("All connections will go through clear net");
let identity = seed.derive_libp2p_identity();
let transport = transport::build_clear_net(&identity)?;
let transport = cli::transport::new(&identity, maybe_tor_socks5_port)?;
let peer_id = identity.public().into_peer_id();
tracing::debug!(%peer_id, "Our peer-id");
let swarm = SwarmBuilder::new(transport, behaviour, peer_id)
.executor(Box::new(|f| {
tokio::spawn(f);
}))
.build();
Ok(swarm)
}
async fn with_tor<B>(seed: &Seed, behaviour: B, tor_socks5_port: u16) -> Result<Swarm<B>>
where
B: NetworkBehaviour,
{
tracing::info!("All connections will go through Tor socks5 proxy");
let identity = seed.derive_libp2p_identity();
let transport = transport::build_tor(&identity, tor_socks5_port)?;
let peer_id = identity.public().into_peer_id();
tracing::debug!(%peer_id, "Our peer-id");
let swarm = SwarmBuilder::new(transport, behaviour, peer_id)
.executor(Box::new(|f| {

View File

@ -1,83 +1,70 @@
use anyhow::anyhow;
use anyhow::Result;
use data_encoding::BASE32;
use futures::future::Ready;
use futures::prelude::*;
use futures::future::{BoxFuture, FutureExt, Ready};
use libp2p::core::multiaddr::{Multiaddr, Protocol};
use libp2p::core::transport::TransportError;
use libp2p::core::Transport;
use libp2p::tcp::tokio::{Tcp, TcpStream};
use libp2p::tcp::{GenTcpConfig, TcpListenStream, TokioTcpConfig};
use libp2p::tcp::TcpListenStream;
use std::io;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::pin::Pin;
use std::net::Ipv4Addr;
use tokio_socks::tcp::Socks5Stream;
use tokio_socks::IntoTargetAddr;
/// Represents the configuration for a Tor transport for libp2p.
/// A [`Transport`] that can dial onion addresses through a running Tor daemon.
#[derive(Clone)]
pub struct TorTcpConfig {
inner: GenTcpConfig<Tcp>,
/// Tor SOCKS5 proxy port number.
pub struct TorDialOnlyTransport {
socks_port: u16,
}
impl TorTcpConfig {
pub fn new(tcp: TokioTcpConfig, socks_port: u16) -> Self {
Self {
inner: tcp,
socks_port,
}
impl TorDialOnlyTransport {
pub fn new(socks_port: u16) -> Self {
Self { socks_port }
}
}
impl Transport for TorTcpConfig {
impl Transport for TorDialOnlyTransport {
type Output = TcpStream;
type Error = io::Error;
type Listener = TcpListenStream<Tcp>;
type ListenerUpgrade = Ready<Result<Self::Output, Self::Error>>;
#[allow(clippy::type_complexity)]
type Dial = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
type Dial = BoxFuture<'static, Result<Self::Output, Self::Error>>;
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
self.inner.listen_on(addr)
Err(TransportError::MultiaddrNotSupported(addr))
}
// dials via Tor's socks5 proxy if configured and if the provided address is an
// onion address. or it falls back to Tcp dialling
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
async fn do_tor_dial(socks_port: u16, dest: String) -> Result<TcpStream, io::Error> {
tracing::trace!("Connecting through Tor proxy to address: {}", dest);
let stream = connect_to_socks_proxy(dest, socks_port)
.await
.map_err(|e| io::Error::new(io::ErrorKind::ConnectionRefused, e))?;
tracing::trace!("Connection through Tor established");
Ok(stream)
}
let tor_address_string = fmt_as_address_string(addr.clone())?;
match to_address_string(addr.clone()) {
Ok(tor_address_string) => {
Ok(Box::pin(do_tor_dial(self.socks_port, tor_address_string)))
}
Err(error) => {
tracing::warn!(
address = %addr,
"Address could not be formatted. Dialling via clear net. Error {:#}", error,
);
self.inner.dial(addr)
}
}
let dial_future = async move {
tracing::trace!("Connecting through Tor proxy to address: {}", addr);
let stream =
Socks5Stream::connect((Ipv4Addr::LOCALHOST, self.socks_port), tor_address_string)
.await
.map_err(|e| io::Error::new(io::ErrorKind::ConnectionRefused, e))?;
tracing::trace!("Connection through Tor established");
Ok(TcpStream(stream.into_inner()))
};
Ok(dial_future.boxed())
}
fn address_translation(&self, listen: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.inner.address_translation(listen, observed)
fn address_translation(&self, _: &Multiaddr, _: &Multiaddr) -> Option<Multiaddr> {
None
}
}
/// Tor expects an address format of ADDR:PORT.
/// This helper function tries to convert the provided multi-address into this
/// format. None is returned if an unsupported protocol was provided.
fn to_address_string(multi: Multiaddr) -> anyhow::Result<String> {
/// Formats the given [`Multiaddr`] as an "address" string.
///
/// For our purposes, we define an address as {HOST}(.{TLD}):{PORT}. This format
/// is what is compatible with the Tor daemon and allows us to route traffic
/// through Tor.
fn fmt_as_address_string(multi: Multiaddr) -> Result<String, TransportError<io::Error>> {
let mut protocols = multi.iter();
let address_string = match protocols.next() {
// if it is an Onion address, we have all we need and can return
Some(Protocol::Onion3(addr)) => {
@ -88,53 +75,33 @@ fn to_address_string(multi: Multiaddr) -> anyhow::Result<String> {
))
}
// Deal with non-onion addresses
Some(Protocol::Ip4(addr)) => Some(format!("{}", addr)),
Some(Protocol::Ip6(addr)) => Some(format!("{}", addr)),
Some(Protocol::Dns(addr)) => Some(format!("{}", addr)),
Some(Protocol::Dns4(addr)) => Some(format!("{}", addr)),
_ => None,
}
.ok_or_else(|| {
anyhow!(
"Could not format address {}. Please consider reporting this issue. ",
multi
)
})?;
let port_string = match protocols.next() {
Some(Protocol::Tcp(port)) => Some(format!("{}", port)),
Some(Protocol::Udp(port)) => Some(format!("{}", port)),
_ => None,
Some(Protocol::Ip4(addr)) => format!("{}", addr),
Some(Protocol::Ip6(addr)) => format!("{}", addr),
Some(Protocol::Dns(addr)) => format!("{}", addr),
Some(Protocol::Dns4(addr)) => format!("{}", addr),
_ => return Err(TransportError::MultiaddrNotSupported(multi)),
};
if let Some(port) = port_string {
Ok(format!("{}:{}", address_string, port))
} else {
Ok(address_string)
}
}
let port = match protocols.next() {
Some(Protocol::Tcp(port)) => port,
Some(Protocol::Udp(port)) => port,
_ => return Err(TransportError::MultiaddrNotSupported(multi)),
};
/// Connect to the SOCKS5 proxy socket.
async fn connect_to_socks_proxy<'a>(
dest: impl IntoTargetAddr<'a>,
port: u16,
) -> Result<TcpStream, tokio_socks::Error> {
let sock = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port));
let stream = Socks5Stream::connect(sock, dest).await?;
Ok(TcpStream(stream.into_inner()))
Ok(format!("{}:{}", address_string, port))
}
#[cfg(test)]
pub mod test {
use crate::network::tor_transport::to_address_string;
use super::*;
#[test]
fn test_tor_address_string() {
let address =
"/onion3/oarchy4tamydxcitaki6bc2v4leza6v35iezmu2chg2bap63sv6f2did:1024/p2p/12D3KooWPD4uHN74SHotLN7VCH7Fm8zZgaNVymYcpeF1fpD2guc9"
;
let address_string =
to_address_string(address.parse().unwrap()).expect("To be a multi formatted address.");
let address_string = fmt_as_address_string(address.parse().unwrap())
.expect("To be a multi formatted address.");
assert_eq!(
address_string,
"oarchy4tamydxcitaki6bc2v4leza6v35iezmu2chg2bap63sv6f2did.onion:1024"
@ -144,55 +111,55 @@ pub mod test {
#[test]
fn tcp_to_address_string_should_be_some() {
let address = "/ip4/127.0.0.1/tcp/7777";
let address_string =
to_address_string(address.parse().unwrap()).expect("To be a formatted multi address. ");
let address_string = fmt_as_address_string(address.parse().unwrap())
.expect("To be a formatted multi address. ");
assert_eq!(address_string, "127.0.0.1:7777");
}
#[test]
fn ip6_to_address_string_should_be_some() {
let address = "/ip6/2001:db8:85a3:8d3:1319:8a2e:370:7348/tcp/7777";
let address_string =
to_address_string(address.parse().unwrap()).expect("To be a formatted multi address. ");
let address_string = fmt_as_address_string(address.parse().unwrap())
.expect("To be a formatted multi address. ");
assert_eq!(address_string, "2001:db8:85a3:8d3:1319:8a2e:370:7348:7777");
}
#[test]
fn udp_to_address_string_should_be_some() {
let address = "/ip4/127.0.0.1/udp/7777";
let address_string =
to_address_string(address.parse().unwrap()).expect("To be a formatted multi address. ");
let address_string = fmt_as_address_string(address.parse().unwrap())
.expect("To be a formatted multi address. ");
assert_eq!(address_string, "127.0.0.1:7777");
}
#[test]
fn ws_to_address_string_should_be_some() {
let address = "/ip4/127.0.0.1/tcp/7777/ws";
let address_string =
to_address_string(address.parse().unwrap()).expect("To be a formatted multi address. ");
let address_string = fmt_as_address_string(address.parse().unwrap())
.expect("To be a formatted multi address. ");
assert_eq!(address_string, "127.0.0.1:7777");
}
#[test]
fn dns4_to_address_string_should_be_some() {
let address = "/dns4/randomdomain.com/tcp/7777";
let address_string =
to_address_string(address.parse().unwrap()).expect("To be a formatted multi address. ");
let address_string = fmt_as_address_string(address.parse().unwrap())
.expect("To be a formatted multi address. ");
assert_eq!(address_string, "randomdomain.com:7777");
}
#[test]
fn dns_to_address_string_should_be_some() {
let address = "/dns/randomdomain.com/tcp/7777";
let address_string =
to_address_string(address.parse().unwrap()).expect("To be a formatted multi address. ");
let address_string = fmt_as_address_string(address.parse().unwrap())
.expect("To be a formatted multi address. ");
assert_eq!(address_string, "randomdomain.com:7777");
}
#[test]
fn dnsaddr_to_address_string_should_be_none() {
let address = "/dnsaddr/randomdomain.com";
let address_string = to_address_string(address.parse().unwrap()).ok();
let address_string = fmt_as_address_string(address.parse().unwrap()).ok();
assert_eq!(address_string, None);
}
}

View File

@ -1,73 +1,39 @@
use crate::network::tor_transport::TorTcpConfig;
use anyhow::Result;
use futures::{AsyncRead, AsyncWrite};
use libp2p::core::muxing::StreamMuxerBox;
use libp2p::core::transport::Boxed;
use libp2p::core::upgrade::{SelectUpgrade, Version};
use libp2p::dns::TokioDnsConfig;
use libp2p::mplex::MplexConfig;
use libp2p::noise::{self, NoiseConfig, X25519Spec};
use libp2p::tcp::TokioTcpConfig;
use libp2p::websocket::WsConfig;
use libp2p::{identity, yamux, PeerId, Transport};
use std::time::Duration;
/// Builds a libp2p transport with the following features:
/// - TcpConnection
/// - WebSocketConnection
/// - DNS name resolution
/// - authentication via noise
/// - multiplexing via yamux or mplex
pub fn build_clear_net(id_keys: &identity::Keypair) -> Result<SwapTransport> {
let dh_keys = noise::Keypair::<X25519Spec>::new().into_authentic(id_keys)?;
let noise = NoiseConfig::xx(dh_keys).into_authenticated();
/// "Completes" a transport by applying the authentication and multiplexing
/// upgrades.
///
/// Even though the actual transport technology in use might be different, for
/// two libp2p applications to be compatible, the authentication and
/// multiplexing upgrades need to be compatible.
pub fn authenticate_and_multiplex<T>(
transport: Boxed<T>,
identity: &identity::Keypair,
) -> Result<Boxed<(PeerId, StreamMuxerBox)>>
where
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
let auth_upgrade = {
let noise_identity = noise::Keypair::<X25519Spec>::new().into_authentic(identity)?;
NoiseConfig::xx(noise_identity).into_authenticated()
};
let multiplex_upgrade = SelectUpgrade::new(yamux::YamuxConfig::default(), MplexConfig::new());
let tcp = TokioTcpConfig::new().nodelay(true);
let dns = TokioDnsConfig::system(tcp)?;
let websocket = WsConfig::new(dns.clone());
let transport = websocket
.or_transport(dns)
let transport = transport
.upgrade(Version::V1)
.authenticate(noise)
.multiplex(SelectUpgrade::new(
yamux::YamuxConfig::default(),
MplexConfig::new(),
))
.authenticate(auth_upgrade)
.multiplex(multiplex_upgrade)
.timeout(Duration::from_secs(20))
.map(|(peer, muxer), _| (peer, StreamMuxerBox::new(muxer)))
.boxed();
Ok(transport)
}
/// Builds a libp2p transport with the following features:
/// - TorTcpConnection
/// - WebSocketConnection
/// - DNS name resolution
/// - authentication via noise
/// - multiplexing via yamux or mplex
pub fn build_tor(id_keys: &identity::Keypair, tor_socks5_port: u16) -> Result<SwapTransport> {
let dh_keys = noise::Keypair::<X25519Spec>::new().into_authentic(id_keys)?;
let noise = NoiseConfig::xx(dh_keys).into_authenticated();
let tcp = TokioTcpConfig::new().nodelay(true);
let tcp = TorTcpConfig::new(tcp, tor_socks5_port);
let dns = TokioDnsConfig::system(tcp)?;
let websocket = WsConfig::new(dns.clone());
let transport = websocket
.or_transport(dns)
.upgrade(Version::V1)
.authenticate(noise)
.multiplex(SelectUpgrade::new(
yamux::YamuxConfig::default(),
MplexConfig::new(),
))
.timeout(Duration::from_secs(20))
.map(|(peer, muxer), _| (peer, StreamMuxerBox::new(muxer)))
.boxed();
Ok(transport)
}
pub type SwapTransport = Boxed<(PeerId, StreamMuxerBox)>;

View File

@ -335,7 +335,7 @@ where
}
}
SwarmEvent::NewListenAddr(address) => {
tracing::info!(%address, "Listening on");
tracing::info!(%address, "New listen address detected");
}
_ => {}
}

View File

@ -231,7 +231,7 @@ async fn start_alice(
let latest_rate = FixedRate::default();
let resume_only = false;
let mut swarm = swarm::alice(
let mut swarm = swarm::asb(
&seed,
current_balance,
lock_fee,
@ -449,7 +449,7 @@ impl BobParams {
) -> Result<(bob::EventLoop, bob::EventLoopHandle)> {
let tor_socks5_port = get_port()
.expect("We don't care about Tor in the tests so we get a free port to disable it.");
let mut swarm = swarm::bob(&self.seed, self.alice_peer_id, tor_socks5_port).await?;
let mut swarm = swarm::cli(&self.seed, self.alice_peer_id, tor_socks5_port).await?;
swarm
.behaviour_mut()
.add_address(self.alice_peer_id, self.alice_address.clone());