diff --git a/swap/src/asb/config.rs b/swap/src/asb/config.rs index 391e4b41..1ecbebe0 100644 --- a/swap/src/asb/config.rs +++ b/swap/src/asb/config.rs @@ -1,5 +1,6 @@ use crate::env::{Mainnet, Testnet}; use crate::fs::{ensure_directory_exists, system_config_dir, system_data_dir}; +use crate::network::rendezvous::DEFAULT_RENDEZVOUS_ADDRESS; use crate::tor::{DEFAULT_CONTROL_PORT, DEFAULT_SOCKS5_PORT}; use anyhow::{bail, Context, Result}; use config::ConfigError; @@ -86,6 +87,7 @@ const DEFAULT_MAX_BUY_AMOUNT: f64 = 0.02f64; const DEFAULT_SPREAD: f64 = 0.02f64; #[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq)] +#[serde(deny_unknown_fields)] pub struct Config { pub data: Data, pub network: Network, @@ -118,6 +120,10 @@ pub struct Data { #[serde(deny_unknown_fields)] pub struct Network { pub listen: Vec, + #[serde(default)] + pub rendezvous_point: Option, + #[serde(default)] + pub external_addresses: Vec, } #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] @@ -285,12 +291,25 @@ pub fn query_user_for_initial_config(testnet: bool) -> Result { } let ask_spread = Decimal::from_f64(ask_spread).context("Unable to parse spread")?; + let rendezvous_address = Input::with_theme(&ColorfulTheme::default()) + .with_prompt("Do you want to advertise your ASB instance with a rendezvous node? Enter an empty string if not.") + .default(DEFAULT_RENDEZVOUS_ADDRESS.to_string()) + .interact_text()?; + + let rendezvous_point = if rendezvous_address.is_empty() { + None + } else { + Some(Multiaddr::from_str(&rendezvous_address)?) + }; + println!(); Ok(Config { data: Data { dir: data_dir }, network: Network { listen: listen_addresses, + rendezvous_point, + external_addresses: vec![], }, bitcoin: Bitcoin { electrum_rpc_url, @@ -340,6 +359,8 @@ mod tests { }, network: Network { listen: vec![defaults.listen_address_tcp, defaults.listen_address_ws], + rendezvous_point: Some(DEFAULT_RENDEZVOUS_ADDRESS.parse().unwrap()), + external_addresses: vec![], }, monero: Monero { @@ -381,6 +402,8 @@ mod tests { }, network: Network { listen: vec![defaults.listen_address_tcp, defaults.listen_address_ws], + rendezvous_point: Some(DEFAULT_RENDEZVOUS_ADDRESS.parse().unwrap()), + external_addresses: vec![], }, monero: Monero { diff --git a/swap/src/asb/event_loop.rs b/swap/src/asb/event_loop.rs index 9aaa1a14..1a8663cb 100644 --- a/swap/src/asb/event_loop.rs +++ b/swap/src/asb/event_loop.rs @@ -1,11 +1,10 @@ use crate::asb::{Behaviour, OutEvent, Rate}; use crate::database::Database; -use crate::env::Config; use crate::network::quote::BidQuote; use crate::network::swap_setup::alice::WalletSnapshot; use crate::network::transfer_proof; use crate::protocol::alice::{AliceState, State3, Swap}; -use crate::{bitcoin, kraken, monero}; +use crate::{bitcoin, env, kraken, monero}; use anyhow::{Context, Result}; use futures::future; use futures::future::{BoxFuture, FutureExt}; @@ -37,7 +36,7 @@ where LR: LatestRate + Send + 'static + Debug + Clone, { swarm: libp2p::Swarm>, - env_config: Config, + env_config: env::Config, bitcoin_wallet: Arc, monero_wallet: Arc, db: Arc, @@ -69,7 +68,7 @@ where #[allow(clippy::too_many_arguments)] pub fn new( swarm: Swarm>, - env_config: Config, + env_config: env::Config, bitcoin_wallet: Arc, monero_wallet: Arc, db: Arc, @@ -245,6 +244,12 @@ where channel }.boxed()); } + SwarmEvent::Behaviour(OutEvent::Rendezvous(libp2p::rendezvous::Event::Registered { .. })) => { + tracing::info!("Successfully registered with rendezvous node"); + } + SwarmEvent::Behaviour(OutEvent::Rendezvous(libp2p::rendezvous::Event::RegisterFailed(error))) => { + tracing::error!("Registration with rendezvous node failed: {:#}", error); + } SwarmEvent::Behaviour(OutEvent::Failure {peer, error}) => { tracing::error!( %peer, diff --git a/swap/src/asb/network.rs b/swap/src/asb/network.rs index 60dd594f..b6e621c0 100644 --- a/swap/src/asb/network.rs +++ b/swap/src/asb/network.rs @@ -1,12 +1,33 @@ +use crate::asb::event_loop::LatestRate; +use crate::env; +use crate::network::quote::BidQuote; +use crate::network::rendezvous::XmrBtcNamespace; +use crate::network::swap_setup::alice; +use crate::network::swap_setup::alice::WalletSnapshot; +use crate::network::transport::authenticate_and_multiplex; +use crate::network::{encrypted_signature, quote, transfer_proof}; +use crate::protocol::alice::State3; +use anyhow::{anyhow, Error, Result}; +use futures::FutureExt; +use libp2p::core::connection::ConnectionId; +use libp2p::core::muxing::StreamMuxerBox; +use libp2p::core::transport::Boxed; +use libp2p::dns::TokioDnsConfig; +use libp2p::ping::{Ping, PingEvent}; +use libp2p::request_response::{RequestId, ResponseChannel}; +use libp2p::swarm::{ + DialPeerCondition, IntoProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, + PollParameters, ProtocolsHandler, +}; +use libp2p::tcp::TokioTcpConfig; +use libp2p::websocket::WsConfig; +use libp2p::{identity, Multiaddr, NetworkBehaviour, PeerId, Transport}; +use std::task::Poll; +use std::time::Duration; +use uuid::Uuid; + pub mod transport { - use crate::network::transport::authenticate_and_multiplex; - use anyhow::Result; - use libp2p::core::muxing::StreamMuxerBox; - use libp2p::core::transport::Boxed; - use libp2p::dns::TokioDnsConfig; - use libp2p::tcp::TokioTcpConfig; - use libp2p::websocket::WsConfig; - use libp2p::{identity, PeerId, Transport}; + use super::*; /// Creates the libp2p transport for the ASB. pub fn new(identity: &identity::Keypair) -> Result> { @@ -21,18 +42,7 @@ pub mod transport { } pub mod behaviour { - use crate::asb::event_loop::LatestRate; - use crate::env; - use crate::network::quote::BidQuote; - use crate::network::swap_setup::alice; - use crate::network::swap_setup::alice::WalletSnapshot; - use crate::network::{encrypted_signature, quote, transfer_proof}; - use crate::protocol::alice::State3; - use anyhow::{anyhow, Error}; - use libp2p::ping::{Ping, PingEvent}; - use libp2p::request_response::{RequestId, ResponseChannel}; - use libp2p::{NetworkBehaviour, PeerId}; - use uuid::Uuid; + use super::*; #[allow(clippy::large_enum_variant)] #[derive(Debug)] @@ -62,6 +72,7 @@ pub mod behaviour { channel: ResponseChannel<()>, peer: PeerId, }, + Rendezvous(libp2p::rendezvous::Event), Failure { peer: PeerId, error: Error, @@ -95,6 +106,7 @@ pub mod behaviour { where LR: LatestRate + Send + 'static, { + pub rendezvous: libp2p::swarm::toggle::Toggle, pub quote: quote::Behaviour, pub swap_setup: alice::Behaviour, pub transfer_proof: transfer_proof::Behaviour, @@ -116,8 +128,20 @@ pub mod behaviour { latest_rate: LR, resume_only: bool, env_config: env::Config, + rendezvous_params: Option<(identity::Keypair, PeerId, Multiaddr, XmrBtcNamespace)>, ) -> Self { Self { + rendezvous: libp2p::swarm::toggle::Toggle::from(rendezvous_params.map( + |(identity, rendezvous_peer_id, rendezvous_address, namespace)| { + rendezous::Behaviour::new( + identity, + rendezvous_peer_id, + rendezvous_address, + namespace, + None, // use default ttl on rendezvous point + ) + }, + )), quote: quote::asb(), swap_setup: alice::Behaviour::new( min_buy, @@ -138,4 +162,280 @@ pub mod behaviour { OutEvent::Other } } + + impl From for OutEvent { + fn from(event: libp2p::rendezvous::Event) -> Self { + OutEvent::Rendezvous(event) + } + } +} + +mod rendezous { + use super::*; + use std::pin::Pin; + + #[derive(PartialEq)] + enum ConnectionStatus { + Disconnected, + Dialling, + Connected, + } + + enum RegistrationStatus { + RegisterOnNextConnection, + Pending, + Registered { + re_register_in: Pin>, + }, + } + + pub struct Behaviour { + inner: libp2p::rendezvous::Rendezvous, + rendezvous_point: Multiaddr, + rendezvous_peer_id: PeerId, + namespace: XmrBtcNamespace, + registration_status: RegistrationStatus, + connection_status: ConnectionStatus, + registration_ttl: Option, + } + + impl Behaviour { + pub fn new( + identity: identity::Keypair, + rendezvous_peer_id: PeerId, + rendezvous_address: Multiaddr, + namespace: XmrBtcNamespace, + registration_ttl: Option, + ) -> Self { + Self { + inner: libp2p::rendezvous::Rendezvous::new( + identity, + libp2p::rendezvous::Config::default(), + ), + rendezvous_point: rendezvous_address, + rendezvous_peer_id, + namespace, + registration_status: RegistrationStatus::RegisterOnNextConnection, + connection_status: ConnectionStatus::Disconnected, + registration_ttl, + } + } + + fn register(&mut self) { + self.inner.register( + self.namespace.into(), + self.rendezvous_peer_id, + self.registration_ttl, + ); + } + } + + impl NetworkBehaviour for Behaviour { + type ProtocolsHandler = + ::ProtocolsHandler; + type OutEvent = libp2p::rendezvous::Event; + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + self.inner.new_handler() + } + + fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { + if peer_id == &self.rendezvous_peer_id { + return vec![self.rendezvous_point.clone()]; + } + + vec![] + } + + fn inject_connected(&mut self, peer_id: &PeerId) { + if peer_id == &self.rendezvous_peer_id { + self.connection_status = ConnectionStatus::Connected; + + match &self.registration_status { + RegistrationStatus::RegisterOnNextConnection => { + self.register(); + self.registration_status = RegistrationStatus::Pending; + } + RegistrationStatus::Registered { .. } => {} + RegistrationStatus::Pending => {} + } + } + } + + fn inject_disconnected(&mut self, peer_id: &PeerId) { + if peer_id == &self.rendezvous_peer_id { + self.connection_status = ConnectionStatus::Disconnected; + } + } + + fn inject_event( + &mut self, + peer_id: PeerId, + connection: ConnectionId, + event: <::Handler as ProtocolsHandler>::OutEvent, + ) { + self.inner.inject_event(peer_id, connection, event) + } + + fn inject_dial_failure(&mut self, peer_id: &PeerId) { + if peer_id == &self.rendezvous_peer_id { + self.connection_status = ConnectionStatus::Disconnected; + } + } + + #[allow(clippy::type_complexity)] + fn poll(&mut self, cx: &mut std::task::Context<'_>, params: &mut impl PollParameters) -> Poll::Handler as ProtocolsHandler>::InEvent, Self::OutEvent>>{ + match &mut self.registration_status { + RegistrationStatus::RegisterOnNextConnection => match self.connection_status { + ConnectionStatus::Disconnected => { + self.connection_status = ConnectionStatus::Dialling; + + return Poll::Ready(NetworkBehaviourAction::DialPeer { + peer_id: self.rendezvous_peer_id, + condition: DialPeerCondition::Disconnected, + }); + } + ConnectionStatus::Dialling => {} + ConnectionStatus::Connected => { + self.registration_status = RegistrationStatus::Pending; + self.register(); + } + }, + RegistrationStatus::Registered { re_register_in } => { + if let Poll::Ready(()) = re_register_in.poll_unpin(cx) { + match self.connection_status { + ConnectionStatus::Connected => { + self.registration_status = RegistrationStatus::Pending; + self.register(); + } + ConnectionStatus::Disconnected => { + self.registration_status = + RegistrationStatus::RegisterOnNextConnection; + + return Poll::Ready(NetworkBehaviourAction::DialPeer { + peer_id: self.rendezvous_peer_id, + condition: DialPeerCondition::Disconnected, + }); + } + ConnectionStatus::Dialling => {} + } + } + } + RegistrationStatus::Pending => {} + } + + let inner_poll = self.inner.poll(cx, params); + + // reset the timer if we successfully registered + if let Poll::Ready(NetworkBehaviourAction::GenerateEvent( + libp2p::rendezvous::Event::Registered { ttl, .. }, + )) = &inner_poll + { + let half_of_ttl = Duration::from_secs(*ttl) / 2; + + self.registration_status = RegistrationStatus::Registered { + re_register_in: Box::pin(tokio::time::sleep(half_of_ttl)), + }; + } + + inner_poll + } + } + + #[cfg(test)] + mod tests { + use super::*; + use crate::network::test::{new_swarm, SwarmExt}; + use futures::StreamExt; + use libp2p::swarm::SwarmEvent; + + #[tokio::test] + async fn given_no_initial_connection_when_constructed_asb_connects_and_registers_with_rendezvous_node( + ) { + let mut rendezvous_node = new_swarm(|_, identity| { + libp2p::rendezvous::Rendezvous::new(identity, libp2p::rendezvous::Config::default()) + }); + let rendezvous_address = rendezvous_node.listen_on_random_memory_address().await; + + let mut asb = new_swarm(|_, identity| { + rendezous::Behaviour::new( + identity, + *rendezvous_node.local_peer_id(), + rendezvous_address, + XmrBtcNamespace::Testnet, + None, + ) + }); + asb.listen_on_random_memory_address().await; // this adds an external address + + tokio::spawn(async move { + loop { + rendezvous_node.next().await; + } + }); + let asb_registered = tokio::spawn(async move { + loop { + if let SwarmEvent::Behaviour(libp2p::rendezvous::Event::Registered { .. }) = + asb.select_next_some().await + { + break; + } + } + }); + + tokio::time::timeout(Duration::from_secs(10), asb_registered) + .await + .unwrap() + .unwrap(); + } + + #[tokio::test] + async fn asb_automatically_re_registers() { + let min_ttl = 5; + let mut rendezvous_node = new_swarm(|_, identity| { + libp2p::rendezvous::Rendezvous::new( + identity, + libp2p::rendezvous::Config::default().with_min_ttl(min_ttl), + ) + }); + let rendezvous_address = rendezvous_node.listen_on_random_memory_address().await; + + let mut asb = new_swarm(|_, identity| { + rendezous::Behaviour::new( + identity, + *rendezvous_node.local_peer_id(), + rendezvous_address, + XmrBtcNamespace::Testnet, + Some(5), + ) + }); + asb.listen_on_random_memory_address().await; // this adds an external address + + tokio::spawn(async move { + loop { + rendezvous_node.next().await; + } + }); + let asb_registered_three_times = tokio::spawn(async move { + let mut number_of_registrations = 0; + + loop { + if let SwarmEvent::Behaviour(libp2p::rendezvous::Event::Registered { .. }) = + asb.select_next_some().await + { + number_of_registrations += 1 + } + + if number_of_registrations == 3 { + break; + } + } + }); + + tokio::time::timeout(Duration::from_secs(30), asb_registered_three_times) + .await + .unwrap() + .unwrap(); + } + } } diff --git a/swap/src/bin/asb.rs b/swap/src/bin/asb.rs index 16b47367..3f43fe21 100644 --- a/swap/src/bin/asb.rs +++ b/swap/src/bin/asb.rs @@ -16,6 +16,7 @@ use anyhow::{bail, Context, Result}; use comfy_table::Table; use libp2p::core::multiaddr::Protocol; use libp2p::core::Multiaddr; +use libp2p::swarm::AddressScore; use libp2p::Swarm; use std::env; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; @@ -29,6 +30,7 @@ use swap::asb::config::{ use swap::asb::{cancel, punish, redeem, refund, safely_abort, EventLoop, Finality, KrakenRate}; use swap::database::Database; use swap::monero::Amount; +use swap::network::rendezvous::XmrBtcNamespace; use swap::network::swarm; use swap::protocol::alice::run; use swap::seed::Seed; @@ -121,7 +123,7 @@ async fn main() -> Result<()> { info!(%monero_balance, "Initialized Monero wallet"); } - let kraken_price_updates = kraken::connect(config.maker.price_ticker_ws_url)?; + let kraken_price_updates = kraken::connect(config.maker.price_ticker_ws_url.clone())?; // setup Tor hidden services let tor_client = @@ -148,15 +150,33 @@ async fn main() -> Result<()> { kraken_rate.clone(), resume_only, env_config, + config.network.rendezvous_point.map(|rendezvous_point| { + ( + rendezvous_point, + if testnet { + XmrBtcNamespace::Testnet + } else { + XmrBtcNamespace::Mainnet + }, + ) + }), )?; - for listen in config.network.listen { + for listen in config.network.listen.clone() { Swarm::listen_on(&mut swarm, listen.clone()) .with_context(|| format!("Failed to listen on network interface {}", listen))?; } tracing::info!(peer_id = %swarm.local_peer_id(), "Network layer initialized"); + for external_address in config.network.external_addresses { + let _ = Swarm::add_external_address( + &mut swarm, + external_address, + AddressScore::Infinite, + ); + } + let (event_loop, mut swap_receiver) = EventLoop::new( swarm, env_config, diff --git a/swap/src/network/swarm.rs b/swap/src/network/swarm.rs index 9d2a6deb..f6a7926b 100644 --- a/swap/src/network/swarm.rs +++ b/swap/src/network/swarm.rs @@ -1,9 +1,11 @@ use crate::asb::LatestRate; +use crate::libp2p_ext::MultiAddrExt; +use crate::network::rendezvous::XmrBtcNamespace; use crate::seed::Seed; use crate::{asb, bitcoin, cli, env, tor}; -use anyhow::Result; +use anyhow::{Context, Result}; use libp2p::swarm::{NetworkBehaviour, SwarmBuilder}; -use libp2p::{identity, Swarm}; +use libp2p::{identity, Multiaddr, Swarm}; use std::fmt::Debug; #[allow(clippy::too_many_arguments)] @@ -14,13 +16,32 @@ pub fn asb( latest_rate: LR, resume_only: bool, env_config: env::Config, + rendezvous_params: Option<(Multiaddr, XmrBtcNamespace)>, ) -> Result>> where LR: LatestRate + Send + 'static + Debug + Clone, { - let behaviour = asb::Behaviour::new(min_buy, max_buy, latest_rate, resume_only, env_config); - let identity = seed.derive_libp2p_identity(); + + let rendezvous_params = if let Some((address, namespace)) = rendezvous_params { + let peer_id = address + .extract_peer_id() + .context("Rendezvous node address must contain peer ID")?; + + Some((identity.clone(), peer_id, address, namespace)) + } else { + None + }; + + let behaviour = asb::Behaviour::new( + min_buy, + max_buy, + latest_rate, + resume_only, + env_config, + rendezvous_params, + ); + let transport = asb::transport::new(&identity)?; let peer_id = identity.public().into_peer_id(); diff --git a/swap/tests/harness/mod.rs b/swap/tests/harness/mod.rs index c68ffcb9..7a03db2d 100644 --- a/swap/tests/harness/mod.rs +++ b/swap/tests/harness/mod.rs @@ -236,6 +236,7 @@ async fn start_alice( latest_rate, resume_only, env_config, + None, ) .unwrap(); swarm.listen_on(listen_address).unwrap();