From 93a0692998c7ce3839cd417a11624c88d312add2 Mon Sep 17 00:00:00 2001 From: rishflab Date: Tue, 29 Jun 2021 14:53:26 +1000 Subject: [PATCH] Integrate rendezvous protocol into ASB The rendezvous protocol allows us to register all of our external addresses. Hence, the first step is to allow the user to configure external addresses as part of the config. In the future, there might be an automated way of determining these. To register with a rendezvous node, the user needs to configure which one. CoBloX is running a rendezvous node that acts as the default by every spec-compliant node will do the job just fine. This behaviour is optional which is why our custom behaviour is wrapped in a `Toggle`. We also want our node to re-register after half the time of the registration has passed. To make this simpler and allow for testing in isolation, we create a custom behaviour that wraps the libp2p rendezvous behaviour. --- swap/src/asb/config.rs | 23 +++ swap/src/asb/event_loop.rs | 13 +- swap/src/asb/network.rs | 340 ++++++++++++++++++++++++++++++++++--- swap/src/bin/asb.rs | 24 ++- swap/src/network/swarm.rs | 29 +++- swap/tests/harness/mod.rs | 1 + 6 files changed, 400 insertions(+), 30 deletions(-) diff --git a/swap/src/asb/config.rs b/swap/src/asb/config.rs index 391e4b41..1ecbebe0 100644 --- a/swap/src/asb/config.rs +++ b/swap/src/asb/config.rs @@ -1,5 +1,6 @@ use crate::env::{Mainnet, Testnet}; use crate::fs::{ensure_directory_exists, system_config_dir, system_data_dir}; +use crate::network::rendezvous::DEFAULT_RENDEZVOUS_ADDRESS; use crate::tor::{DEFAULT_CONTROL_PORT, DEFAULT_SOCKS5_PORT}; use anyhow::{bail, Context, Result}; use config::ConfigError; @@ -86,6 +87,7 @@ const DEFAULT_MAX_BUY_AMOUNT: f64 = 0.02f64; const DEFAULT_SPREAD: f64 = 0.02f64; #[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq)] +#[serde(deny_unknown_fields)] pub struct Config { pub data: Data, pub network: Network, @@ -118,6 +120,10 @@ pub struct Data { #[serde(deny_unknown_fields)] pub struct Network { pub listen: Vec, + #[serde(default)] + pub rendezvous_point: Option, + #[serde(default)] + pub external_addresses: Vec, } #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] @@ -285,12 +291,25 @@ pub fn query_user_for_initial_config(testnet: bool) -> Result { } let ask_spread = Decimal::from_f64(ask_spread).context("Unable to parse spread")?; + let rendezvous_address = Input::with_theme(&ColorfulTheme::default()) + .with_prompt("Do you want to advertise your ASB instance with a rendezvous node? Enter an empty string if not.") + .default(DEFAULT_RENDEZVOUS_ADDRESS.to_string()) + .interact_text()?; + + let rendezvous_point = if rendezvous_address.is_empty() { + None + } else { + Some(Multiaddr::from_str(&rendezvous_address)?) + }; + println!(); Ok(Config { data: Data { dir: data_dir }, network: Network { listen: listen_addresses, + rendezvous_point, + external_addresses: vec![], }, bitcoin: Bitcoin { electrum_rpc_url, @@ -340,6 +359,8 @@ mod tests { }, network: Network { listen: vec![defaults.listen_address_tcp, defaults.listen_address_ws], + rendezvous_point: Some(DEFAULT_RENDEZVOUS_ADDRESS.parse().unwrap()), + external_addresses: vec![], }, monero: Monero { @@ -381,6 +402,8 @@ mod tests { }, network: Network { listen: vec![defaults.listen_address_tcp, defaults.listen_address_ws], + rendezvous_point: Some(DEFAULT_RENDEZVOUS_ADDRESS.parse().unwrap()), + external_addresses: vec![], }, monero: Monero { diff --git a/swap/src/asb/event_loop.rs b/swap/src/asb/event_loop.rs index 9aaa1a14..1a8663cb 100644 --- a/swap/src/asb/event_loop.rs +++ b/swap/src/asb/event_loop.rs @@ -1,11 +1,10 @@ use crate::asb::{Behaviour, OutEvent, Rate}; use crate::database::Database; -use crate::env::Config; use crate::network::quote::BidQuote; use crate::network::swap_setup::alice::WalletSnapshot; use crate::network::transfer_proof; use crate::protocol::alice::{AliceState, State3, Swap}; -use crate::{bitcoin, kraken, monero}; +use crate::{bitcoin, env, kraken, monero}; use anyhow::{Context, Result}; use futures::future; use futures::future::{BoxFuture, FutureExt}; @@ -37,7 +36,7 @@ where LR: LatestRate + Send + 'static + Debug + Clone, { swarm: libp2p::Swarm>, - env_config: Config, + env_config: env::Config, bitcoin_wallet: Arc, monero_wallet: Arc, db: Arc, @@ -69,7 +68,7 @@ where #[allow(clippy::too_many_arguments)] pub fn new( swarm: Swarm>, - env_config: Config, + env_config: env::Config, bitcoin_wallet: Arc, monero_wallet: Arc, db: Arc, @@ -245,6 +244,12 @@ where channel }.boxed()); } + SwarmEvent::Behaviour(OutEvent::Rendezvous(libp2p::rendezvous::Event::Registered { .. })) => { + tracing::info!("Successfully registered with rendezvous node"); + } + SwarmEvent::Behaviour(OutEvent::Rendezvous(libp2p::rendezvous::Event::RegisterFailed(error))) => { + tracing::error!("Registration with rendezvous node failed: {:#}", error); + } SwarmEvent::Behaviour(OutEvent::Failure {peer, error}) => { tracing::error!( %peer, diff --git a/swap/src/asb/network.rs b/swap/src/asb/network.rs index 60dd594f..b6e621c0 100644 --- a/swap/src/asb/network.rs +++ b/swap/src/asb/network.rs @@ -1,12 +1,33 @@ +use crate::asb::event_loop::LatestRate; +use crate::env; +use crate::network::quote::BidQuote; +use crate::network::rendezvous::XmrBtcNamespace; +use crate::network::swap_setup::alice; +use crate::network::swap_setup::alice::WalletSnapshot; +use crate::network::transport::authenticate_and_multiplex; +use crate::network::{encrypted_signature, quote, transfer_proof}; +use crate::protocol::alice::State3; +use anyhow::{anyhow, Error, Result}; +use futures::FutureExt; +use libp2p::core::connection::ConnectionId; +use libp2p::core::muxing::StreamMuxerBox; +use libp2p::core::transport::Boxed; +use libp2p::dns::TokioDnsConfig; +use libp2p::ping::{Ping, PingEvent}; +use libp2p::request_response::{RequestId, ResponseChannel}; +use libp2p::swarm::{ + DialPeerCondition, IntoProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, + PollParameters, ProtocolsHandler, +}; +use libp2p::tcp::TokioTcpConfig; +use libp2p::websocket::WsConfig; +use libp2p::{identity, Multiaddr, NetworkBehaviour, PeerId, Transport}; +use std::task::Poll; +use std::time::Duration; +use uuid::Uuid; + pub mod transport { - use crate::network::transport::authenticate_and_multiplex; - use anyhow::Result; - use libp2p::core::muxing::StreamMuxerBox; - use libp2p::core::transport::Boxed; - use libp2p::dns::TokioDnsConfig; - use libp2p::tcp::TokioTcpConfig; - use libp2p::websocket::WsConfig; - use libp2p::{identity, PeerId, Transport}; + use super::*; /// Creates the libp2p transport for the ASB. pub fn new(identity: &identity::Keypair) -> Result> { @@ -21,18 +42,7 @@ pub mod transport { } pub mod behaviour { - use crate::asb::event_loop::LatestRate; - use crate::env; - use crate::network::quote::BidQuote; - use crate::network::swap_setup::alice; - use crate::network::swap_setup::alice::WalletSnapshot; - use crate::network::{encrypted_signature, quote, transfer_proof}; - use crate::protocol::alice::State3; - use anyhow::{anyhow, Error}; - use libp2p::ping::{Ping, PingEvent}; - use libp2p::request_response::{RequestId, ResponseChannel}; - use libp2p::{NetworkBehaviour, PeerId}; - use uuid::Uuid; + use super::*; #[allow(clippy::large_enum_variant)] #[derive(Debug)] @@ -62,6 +72,7 @@ pub mod behaviour { channel: ResponseChannel<()>, peer: PeerId, }, + Rendezvous(libp2p::rendezvous::Event), Failure { peer: PeerId, error: Error, @@ -95,6 +106,7 @@ pub mod behaviour { where LR: LatestRate + Send + 'static, { + pub rendezvous: libp2p::swarm::toggle::Toggle, pub quote: quote::Behaviour, pub swap_setup: alice::Behaviour, pub transfer_proof: transfer_proof::Behaviour, @@ -116,8 +128,20 @@ pub mod behaviour { latest_rate: LR, resume_only: bool, env_config: env::Config, + rendezvous_params: Option<(identity::Keypair, PeerId, Multiaddr, XmrBtcNamespace)>, ) -> Self { Self { + rendezvous: libp2p::swarm::toggle::Toggle::from(rendezvous_params.map( + |(identity, rendezvous_peer_id, rendezvous_address, namespace)| { + rendezous::Behaviour::new( + identity, + rendezvous_peer_id, + rendezvous_address, + namespace, + None, // use default ttl on rendezvous point + ) + }, + )), quote: quote::asb(), swap_setup: alice::Behaviour::new( min_buy, @@ -138,4 +162,280 @@ pub mod behaviour { OutEvent::Other } } + + impl From for OutEvent { + fn from(event: libp2p::rendezvous::Event) -> Self { + OutEvent::Rendezvous(event) + } + } +} + +mod rendezous { + use super::*; + use std::pin::Pin; + + #[derive(PartialEq)] + enum ConnectionStatus { + Disconnected, + Dialling, + Connected, + } + + enum RegistrationStatus { + RegisterOnNextConnection, + Pending, + Registered { + re_register_in: Pin>, + }, + } + + pub struct Behaviour { + inner: libp2p::rendezvous::Rendezvous, + rendezvous_point: Multiaddr, + rendezvous_peer_id: PeerId, + namespace: XmrBtcNamespace, + registration_status: RegistrationStatus, + connection_status: ConnectionStatus, + registration_ttl: Option, + } + + impl Behaviour { + pub fn new( + identity: identity::Keypair, + rendezvous_peer_id: PeerId, + rendezvous_address: Multiaddr, + namespace: XmrBtcNamespace, + registration_ttl: Option, + ) -> Self { + Self { + inner: libp2p::rendezvous::Rendezvous::new( + identity, + libp2p::rendezvous::Config::default(), + ), + rendezvous_point: rendezvous_address, + rendezvous_peer_id, + namespace, + registration_status: RegistrationStatus::RegisterOnNextConnection, + connection_status: ConnectionStatus::Disconnected, + registration_ttl, + } + } + + fn register(&mut self) { + self.inner.register( + self.namespace.into(), + self.rendezvous_peer_id, + self.registration_ttl, + ); + } + } + + impl NetworkBehaviour for Behaviour { + type ProtocolsHandler = + ::ProtocolsHandler; + type OutEvent = libp2p::rendezvous::Event; + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + self.inner.new_handler() + } + + fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { + if peer_id == &self.rendezvous_peer_id { + return vec![self.rendezvous_point.clone()]; + } + + vec![] + } + + fn inject_connected(&mut self, peer_id: &PeerId) { + if peer_id == &self.rendezvous_peer_id { + self.connection_status = ConnectionStatus::Connected; + + match &self.registration_status { + RegistrationStatus::RegisterOnNextConnection => { + self.register(); + self.registration_status = RegistrationStatus::Pending; + } + RegistrationStatus::Registered { .. } => {} + RegistrationStatus::Pending => {} + } + } + } + + fn inject_disconnected(&mut self, peer_id: &PeerId) { + if peer_id == &self.rendezvous_peer_id { + self.connection_status = ConnectionStatus::Disconnected; + } + } + + fn inject_event( + &mut self, + peer_id: PeerId, + connection: ConnectionId, + event: <::Handler as ProtocolsHandler>::OutEvent, + ) { + self.inner.inject_event(peer_id, connection, event) + } + + fn inject_dial_failure(&mut self, peer_id: &PeerId) { + if peer_id == &self.rendezvous_peer_id { + self.connection_status = ConnectionStatus::Disconnected; + } + } + + #[allow(clippy::type_complexity)] + fn poll(&mut self, cx: &mut std::task::Context<'_>, params: &mut impl PollParameters) -> Poll::Handler as ProtocolsHandler>::InEvent, Self::OutEvent>>{ + match &mut self.registration_status { + RegistrationStatus::RegisterOnNextConnection => match self.connection_status { + ConnectionStatus::Disconnected => { + self.connection_status = ConnectionStatus::Dialling; + + return Poll::Ready(NetworkBehaviourAction::DialPeer { + peer_id: self.rendezvous_peer_id, + condition: DialPeerCondition::Disconnected, + }); + } + ConnectionStatus::Dialling => {} + ConnectionStatus::Connected => { + self.registration_status = RegistrationStatus::Pending; + self.register(); + } + }, + RegistrationStatus::Registered { re_register_in } => { + if let Poll::Ready(()) = re_register_in.poll_unpin(cx) { + match self.connection_status { + ConnectionStatus::Connected => { + self.registration_status = RegistrationStatus::Pending; + self.register(); + } + ConnectionStatus::Disconnected => { + self.registration_status = + RegistrationStatus::RegisterOnNextConnection; + + return Poll::Ready(NetworkBehaviourAction::DialPeer { + peer_id: self.rendezvous_peer_id, + condition: DialPeerCondition::Disconnected, + }); + } + ConnectionStatus::Dialling => {} + } + } + } + RegistrationStatus::Pending => {} + } + + let inner_poll = self.inner.poll(cx, params); + + // reset the timer if we successfully registered + if let Poll::Ready(NetworkBehaviourAction::GenerateEvent( + libp2p::rendezvous::Event::Registered { ttl, .. }, + )) = &inner_poll + { + let half_of_ttl = Duration::from_secs(*ttl) / 2; + + self.registration_status = RegistrationStatus::Registered { + re_register_in: Box::pin(tokio::time::sleep(half_of_ttl)), + }; + } + + inner_poll + } + } + + #[cfg(test)] + mod tests { + use super::*; + use crate::network::test::{new_swarm, SwarmExt}; + use futures::StreamExt; + use libp2p::swarm::SwarmEvent; + + #[tokio::test] + async fn given_no_initial_connection_when_constructed_asb_connects_and_registers_with_rendezvous_node( + ) { + let mut rendezvous_node = new_swarm(|_, identity| { + libp2p::rendezvous::Rendezvous::new(identity, libp2p::rendezvous::Config::default()) + }); + let rendezvous_address = rendezvous_node.listen_on_random_memory_address().await; + + let mut asb = new_swarm(|_, identity| { + rendezous::Behaviour::new( + identity, + *rendezvous_node.local_peer_id(), + rendezvous_address, + XmrBtcNamespace::Testnet, + None, + ) + }); + asb.listen_on_random_memory_address().await; // this adds an external address + + tokio::spawn(async move { + loop { + rendezvous_node.next().await; + } + }); + let asb_registered = tokio::spawn(async move { + loop { + if let SwarmEvent::Behaviour(libp2p::rendezvous::Event::Registered { .. }) = + asb.select_next_some().await + { + break; + } + } + }); + + tokio::time::timeout(Duration::from_secs(10), asb_registered) + .await + .unwrap() + .unwrap(); + } + + #[tokio::test] + async fn asb_automatically_re_registers() { + let min_ttl = 5; + let mut rendezvous_node = new_swarm(|_, identity| { + libp2p::rendezvous::Rendezvous::new( + identity, + libp2p::rendezvous::Config::default().with_min_ttl(min_ttl), + ) + }); + let rendezvous_address = rendezvous_node.listen_on_random_memory_address().await; + + let mut asb = new_swarm(|_, identity| { + rendezous::Behaviour::new( + identity, + *rendezvous_node.local_peer_id(), + rendezvous_address, + XmrBtcNamespace::Testnet, + Some(5), + ) + }); + asb.listen_on_random_memory_address().await; // this adds an external address + + tokio::spawn(async move { + loop { + rendezvous_node.next().await; + } + }); + let asb_registered_three_times = tokio::spawn(async move { + let mut number_of_registrations = 0; + + loop { + if let SwarmEvent::Behaviour(libp2p::rendezvous::Event::Registered { .. }) = + asb.select_next_some().await + { + number_of_registrations += 1 + } + + if number_of_registrations == 3 { + break; + } + } + }); + + tokio::time::timeout(Duration::from_secs(30), asb_registered_three_times) + .await + .unwrap() + .unwrap(); + } + } } diff --git a/swap/src/bin/asb.rs b/swap/src/bin/asb.rs index 16b47367..3f43fe21 100644 --- a/swap/src/bin/asb.rs +++ b/swap/src/bin/asb.rs @@ -16,6 +16,7 @@ use anyhow::{bail, Context, Result}; use comfy_table::Table; use libp2p::core::multiaddr::Protocol; use libp2p::core::Multiaddr; +use libp2p::swarm::AddressScore; use libp2p::Swarm; use std::env; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; @@ -29,6 +30,7 @@ use swap::asb::config::{ use swap::asb::{cancel, punish, redeem, refund, safely_abort, EventLoop, Finality, KrakenRate}; use swap::database::Database; use swap::monero::Amount; +use swap::network::rendezvous::XmrBtcNamespace; use swap::network::swarm; use swap::protocol::alice::run; use swap::seed::Seed; @@ -121,7 +123,7 @@ async fn main() -> Result<()> { info!(%monero_balance, "Initialized Monero wallet"); } - let kraken_price_updates = kraken::connect(config.maker.price_ticker_ws_url)?; + let kraken_price_updates = kraken::connect(config.maker.price_ticker_ws_url.clone())?; // setup Tor hidden services let tor_client = @@ -148,15 +150,33 @@ async fn main() -> Result<()> { kraken_rate.clone(), resume_only, env_config, + config.network.rendezvous_point.map(|rendezvous_point| { + ( + rendezvous_point, + if testnet { + XmrBtcNamespace::Testnet + } else { + XmrBtcNamespace::Mainnet + }, + ) + }), )?; - for listen in config.network.listen { + for listen in config.network.listen.clone() { Swarm::listen_on(&mut swarm, listen.clone()) .with_context(|| format!("Failed to listen on network interface {}", listen))?; } tracing::info!(peer_id = %swarm.local_peer_id(), "Network layer initialized"); + for external_address in config.network.external_addresses { + let _ = Swarm::add_external_address( + &mut swarm, + external_address, + AddressScore::Infinite, + ); + } + let (event_loop, mut swap_receiver) = EventLoop::new( swarm, env_config, diff --git a/swap/src/network/swarm.rs b/swap/src/network/swarm.rs index 9d2a6deb..f6a7926b 100644 --- a/swap/src/network/swarm.rs +++ b/swap/src/network/swarm.rs @@ -1,9 +1,11 @@ use crate::asb::LatestRate; +use crate::libp2p_ext::MultiAddrExt; +use crate::network::rendezvous::XmrBtcNamespace; use crate::seed::Seed; use crate::{asb, bitcoin, cli, env, tor}; -use anyhow::Result; +use anyhow::{Context, Result}; use libp2p::swarm::{NetworkBehaviour, SwarmBuilder}; -use libp2p::{identity, Swarm}; +use libp2p::{identity, Multiaddr, Swarm}; use std::fmt::Debug; #[allow(clippy::too_many_arguments)] @@ -14,13 +16,32 @@ pub fn asb( latest_rate: LR, resume_only: bool, env_config: env::Config, + rendezvous_params: Option<(Multiaddr, XmrBtcNamespace)>, ) -> Result>> where LR: LatestRate + Send + 'static + Debug + Clone, { - let behaviour = asb::Behaviour::new(min_buy, max_buy, latest_rate, resume_only, env_config); - let identity = seed.derive_libp2p_identity(); + + let rendezvous_params = if let Some((address, namespace)) = rendezvous_params { + let peer_id = address + .extract_peer_id() + .context("Rendezvous node address must contain peer ID")?; + + Some((identity.clone(), peer_id, address, namespace)) + } else { + None + }; + + let behaviour = asb::Behaviour::new( + min_buy, + max_buy, + latest_rate, + resume_only, + env_config, + rendezvous_params, + ); + let transport = asb::transport::new(&identity)?; let peer_id = identity.public().into_peer_id(); diff --git a/swap/tests/harness/mod.rs b/swap/tests/harness/mod.rs index c68ffcb9..7a03db2d 100644 --- a/swap/tests/harness/mod.rs +++ b/swap/tests/harness/mod.rs @@ -236,6 +236,7 @@ async fn start_alice( latest_rate, resume_only, env_config, + None, ) .unwrap(); swarm.listen_on(listen_address).unwrap();