Integrate rendezvous protocol into ASB

The rendezvous protocol allows us to register all of our external
addresses. Hence, the first step is to allow the user to configure
external addresses as part of the config. In the future, there might
be an automated way of determining these.

To register with a rendezvous node, the user needs to configure which
one. CoBloX is running a rendezvous node that acts as the default by
every spec-compliant node will do the job just fine. This behaviour
is optional which is why our custom behaviour is wrapped in a `Toggle`.

We also want our node to re-register after half the time of the
registration has passed. To make this simpler and allow for testing in
isolation, we create a custom behaviour that wraps the libp2p rendezvous
behaviour.
This commit is contained in:
rishflab 2021-06-29 14:53:26 +10:00 committed by Thomas Eizinger
parent ff10edd8a4
commit 93a0692998
No known key found for this signature in database
GPG Key ID: 651AC83A6C6C8B96
6 changed files with 400 additions and 30 deletions

View File

@ -1,5 +1,6 @@
use crate::env::{Mainnet, Testnet};
use crate::fs::{ensure_directory_exists, system_config_dir, system_data_dir};
use crate::network::rendezvous::DEFAULT_RENDEZVOUS_ADDRESS;
use crate::tor::{DEFAULT_CONTROL_PORT, DEFAULT_SOCKS5_PORT};
use anyhow::{bail, Context, Result};
use config::ConfigError;
@ -86,6 +87,7 @@ const DEFAULT_MAX_BUY_AMOUNT: f64 = 0.02f64;
const DEFAULT_SPREAD: f64 = 0.02f64;
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq)]
#[serde(deny_unknown_fields)]
pub struct Config {
pub data: Data,
pub network: Network,
@ -118,6 +120,10 @@ pub struct Data {
#[serde(deny_unknown_fields)]
pub struct Network {
pub listen: Vec<Multiaddr>,
#[serde(default)]
pub rendezvous_point: Option<Multiaddr>,
#[serde(default)]
pub external_addresses: Vec<Multiaddr>,
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
@ -285,12 +291,25 @@ pub fn query_user_for_initial_config(testnet: bool) -> Result<Config> {
}
let ask_spread = Decimal::from_f64(ask_spread).context("Unable to parse spread")?;
let rendezvous_address = Input::with_theme(&ColorfulTheme::default())
.with_prompt("Do you want to advertise your ASB instance with a rendezvous node? Enter an empty string if not.")
.default(DEFAULT_RENDEZVOUS_ADDRESS.to_string())
.interact_text()?;
let rendezvous_point = if rendezvous_address.is_empty() {
None
} else {
Some(Multiaddr::from_str(&rendezvous_address)?)
};
println!();
Ok(Config {
data: Data { dir: data_dir },
network: Network {
listen: listen_addresses,
rendezvous_point,
external_addresses: vec![],
},
bitcoin: Bitcoin {
electrum_rpc_url,
@ -340,6 +359,8 @@ mod tests {
},
network: Network {
listen: vec![defaults.listen_address_tcp, defaults.listen_address_ws],
rendezvous_point: Some(DEFAULT_RENDEZVOUS_ADDRESS.parse().unwrap()),
external_addresses: vec![],
},
monero: Monero {
@ -381,6 +402,8 @@ mod tests {
},
network: Network {
listen: vec![defaults.listen_address_tcp, defaults.listen_address_ws],
rendezvous_point: Some(DEFAULT_RENDEZVOUS_ADDRESS.parse().unwrap()),
external_addresses: vec![],
},
monero: Monero {

View File

@ -1,11 +1,10 @@
use crate::asb::{Behaviour, OutEvent, Rate};
use crate::database::Database;
use crate::env::Config;
use crate::network::quote::BidQuote;
use crate::network::swap_setup::alice::WalletSnapshot;
use crate::network::transfer_proof;
use crate::protocol::alice::{AliceState, State3, Swap};
use crate::{bitcoin, kraken, monero};
use crate::{bitcoin, env, kraken, monero};
use anyhow::{Context, Result};
use futures::future;
use futures::future::{BoxFuture, FutureExt};
@ -37,7 +36,7 @@ where
LR: LatestRate + Send + 'static + Debug + Clone,
{
swarm: libp2p::Swarm<Behaviour<LR>>,
env_config: Config,
env_config: env::Config,
bitcoin_wallet: Arc<bitcoin::Wallet>,
monero_wallet: Arc<monero::Wallet>,
db: Arc<Database>,
@ -69,7 +68,7 @@ where
#[allow(clippy::too_many_arguments)]
pub fn new(
swarm: Swarm<Behaviour<LR>>,
env_config: Config,
env_config: env::Config,
bitcoin_wallet: Arc<bitcoin::Wallet>,
monero_wallet: Arc<monero::Wallet>,
db: Arc<Database>,
@ -245,6 +244,12 @@ where
channel
}.boxed());
}
SwarmEvent::Behaviour(OutEvent::Rendezvous(libp2p::rendezvous::Event::Registered { .. })) => {
tracing::info!("Successfully registered with rendezvous node");
}
SwarmEvent::Behaviour(OutEvent::Rendezvous(libp2p::rendezvous::Event::RegisterFailed(error))) => {
tracing::error!("Registration with rendezvous node failed: {:#}", error);
}
SwarmEvent::Behaviour(OutEvent::Failure {peer, error}) => {
tracing::error!(
%peer,

View File

@ -1,12 +1,33 @@
use crate::asb::event_loop::LatestRate;
use crate::env;
use crate::network::quote::BidQuote;
use crate::network::rendezvous::XmrBtcNamespace;
use crate::network::swap_setup::alice;
use crate::network::swap_setup::alice::WalletSnapshot;
use crate::network::transport::authenticate_and_multiplex;
use crate::network::{encrypted_signature, quote, transfer_proof};
use crate::protocol::alice::State3;
use anyhow::{anyhow, Error, Result};
use futures::FutureExt;
use libp2p::core::connection::ConnectionId;
use libp2p::core::muxing::StreamMuxerBox;
use libp2p::core::transport::Boxed;
use libp2p::dns::TokioDnsConfig;
use libp2p::ping::{Ping, PingEvent};
use libp2p::request_response::{RequestId, ResponseChannel};
use libp2p::swarm::{
DialPeerCondition, IntoProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction,
PollParameters, ProtocolsHandler,
};
use libp2p::tcp::TokioTcpConfig;
use libp2p::websocket::WsConfig;
use libp2p::{identity, Multiaddr, NetworkBehaviour, PeerId, Transport};
use std::task::Poll;
use std::time::Duration;
use uuid::Uuid;
pub mod transport {
use crate::network::transport::authenticate_and_multiplex;
use anyhow::Result;
use libp2p::core::muxing::StreamMuxerBox;
use libp2p::core::transport::Boxed;
use libp2p::dns::TokioDnsConfig;
use libp2p::tcp::TokioTcpConfig;
use libp2p::websocket::WsConfig;
use libp2p::{identity, PeerId, Transport};
use super::*;
/// Creates the libp2p transport for the ASB.
pub fn new(identity: &identity::Keypair) -> Result<Boxed<(PeerId, StreamMuxerBox)>> {
@ -21,18 +42,7 @@ pub mod transport {
}
pub mod behaviour {
use crate::asb::event_loop::LatestRate;
use crate::env;
use crate::network::quote::BidQuote;
use crate::network::swap_setup::alice;
use crate::network::swap_setup::alice::WalletSnapshot;
use crate::network::{encrypted_signature, quote, transfer_proof};
use crate::protocol::alice::State3;
use anyhow::{anyhow, Error};
use libp2p::ping::{Ping, PingEvent};
use libp2p::request_response::{RequestId, ResponseChannel};
use libp2p::{NetworkBehaviour, PeerId};
use uuid::Uuid;
use super::*;
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
@ -62,6 +72,7 @@ pub mod behaviour {
channel: ResponseChannel<()>,
peer: PeerId,
},
Rendezvous(libp2p::rendezvous::Event),
Failure {
peer: PeerId,
error: Error,
@ -95,6 +106,7 @@ pub mod behaviour {
where
LR: LatestRate + Send + 'static,
{
pub rendezvous: libp2p::swarm::toggle::Toggle<rendezous::Behaviour>,
pub quote: quote::Behaviour,
pub swap_setup: alice::Behaviour<LR>,
pub transfer_proof: transfer_proof::Behaviour,
@ -116,8 +128,20 @@ pub mod behaviour {
latest_rate: LR,
resume_only: bool,
env_config: env::Config,
rendezvous_params: Option<(identity::Keypair, PeerId, Multiaddr, XmrBtcNamespace)>,
) -> Self {
Self {
rendezvous: libp2p::swarm::toggle::Toggle::from(rendezvous_params.map(
|(identity, rendezvous_peer_id, rendezvous_address, namespace)| {
rendezous::Behaviour::new(
identity,
rendezvous_peer_id,
rendezvous_address,
namespace,
None, // use default ttl on rendezvous point
)
},
)),
quote: quote::asb(),
swap_setup: alice::Behaviour::new(
min_buy,
@ -138,4 +162,280 @@ pub mod behaviour {
OutEvent::Other
}
}
impl From<libp2p::rendezvous::Event> for OutEvent {
fn from(event: libp2p::rendezvous::Event) -> Self {
OutEvent::Rendezvous(event)
}
}
}
mod rendezous {
use super::*;
use std::pin::Pin;
#[derive(PartialEq)]
enum ConnectionStatus {
Disconnected,
Dialling,
Connected,
}
enum RegistrationStatus {
RegisterOnNextConnection,
Pending,
Registered {
re_register_in: Pin<Box<tokio::time::Sleep>>,
},
}
pub struct Behaviour {
inner: libp2p::rendezvous::Rendezvous,
rendezvous_point: Multiaddr,
rendezvous_peer_id: PeerId,
namespace: XmrBtcNamespace,
registration_status: RegistrationStatus,
connection_status: ConnectionStatus,
registration_ttl: Option<u64>,
}
impl Behaviour {
pub fn new(
identity: identity::Keypair,
rendezvous_peer_id: PeerId,
rendezvous_address: Multiaddr,
namespace: XmrBtcNamespace,
registration_ttl: Option<u64>,
) -> Self {
Self {
inner: libp2p::rendezvous::Rendezvous::new(
identity,
libp2p::rendezvous::Config::default(),
),
rendezvous_point: rendezvous_address,
rendezvous_peer_id,
namespace,
registration_status: RegistrationStatus::RegisterOnNextConnection,
connection_status: ConnectionStatus::Disconnected,
registration_ttl,
}
}
fn register(&mut self) {
self.inner.register(
self.namespace.into(),
self.rendezvous_peer_id,
self.registration_ttl,
);
}
}
impl NetworkBehaviour for Behaviour {
type ProtocolsHandler =
<libp2p::rendezvous::Rendezvous as NetworkBehaviour>::ProtocolsHandler;
type OutEvent = libp2p::rendezvous::Event;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
self.inner.new_handler()
}
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
if peer_id == &self.rendezvous_peer_id {
return vec![self.rendezvous_point.clone()];
}
vec![]
}
fn inject_connected(&mut self, peer_id: &PeerId) {
if peer_id == &self.rendezvous_peer_id {
self.connection_status = ConnectionStatus::Connected;
match &self.registration_status {
RegistrationStatus::RegisterOnNextConnection => {
self.register();
self.registration_status = RegistrationStatus::Pending;
}
RegistrationStatus::Registered { .. } => {}
RegistrationStatus::Pending => {}
}
}
}
fn inject_disconnected(&mut self, peer_id: &PeerId) {
if peer_id == &self.rendezvous_peer_id {
self.connection_status = ConnectionStatus::Disconnected;
}
}
fn inject_event(
&mut self,
peer_id: PeerId,
connection: ConnectionId,
event: <<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent,
) {
self.inner.inject_event(peer_id, connection, event)
}
fn inject_dial_failure(&mut self, peer_id: &PeerId) {
if peer_id == &self.rendezvous_peer_id {
self.connection_status = ConnectionStatus::Disconnected;
}
}
#[allow(clippy::type_complexity)]
fn poll(&mut self, cx: &mut std::task::Context<'_>, params: &mut impl PollParameters) -> Poll<NetworkBehaviourAction<<<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent, Self::OutEvent>>{
match &mut self.registration_status {
RegistrationStatus::RegisterOnNextConnection => match self.connection_status {
ConnectionStatus::Disconnected => {
self.connection_status = ConnectionStatus::Dialling;
return Poll::Ready(NetworkBehaviourAction::DialPeer {
peer_id: self.rendezvous_peer_id,
condition: DialPeerCondition::Disconnected,
});
}
ConnectionStatus::Dialling => {}
ConnectionStatus::Connected => {
self.registration_status = RegistrationStatus::Pending;
self.register();
}
},
RegistrationStatus::Registered { re_register_in } => {
if let Poll::Ready(()) = re_register_in.poll_unpin(cx) {
match self.connection_status {
ConnectionStatus::Connected => {
self.registration_status = RegistrationStatus::Pending;
self.register();
}
ConnectionStatus::Disconnected => {
self.registration_status =
RegistrationStatus::RegisterOnNextConnection;
return Poll::Ready(NetworkBehaviourAction::DialPeer {
peer_id: self.rendezvous_peer_id,
condition: DialPeerCondition::Disconnected,
});
}
ConnectionStatus::Dialling => {}
}
}
}
RegistrationStatus::Pending => {}
}
let inner_poll = self.inner.poll(cx, params);
// reset the timer if we successfully registered
if let Poll::Ready(NetworkBehaviourAction::GenerateEvent(
libp2p::rendezvous::Event::Registered { ttl, .. },
)) = &inner_poll
{
let half_of_ttl = Duration::from_secs(*ttl) / 2;
self.registration_status = RegistrationStatus::Registered {
re_register_in: Box::pin(tokio::time::sleep(half_of_ttl)),
};
}
inner_poll
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::network::test::{new_swarm, SwarmExt};
use futures::StreamExt;
use libp2p::swarm::SwarmEvent;
#[tokio::test]
async fn given_no_initial_connection_when_constructed_asb_connects_and_registers_with_rendezvous_node(
) {
let mut rendezvous_node = new_swarm(|_, identity| {
libp2p::rendezvous::Rendezvous::new(identity, libp2p::rendezvous::Config::default())
});
let rendezvous_address = rendezvous_node.listen_on_random_memory_address().await;
let mut asb = new_swarm(|_, identity| {
rendezous::Behaviour::new(
identity,
*rendezvous_node.local_peer_id(),
rendezvous_address,
XmrBtcNamespace::Testnet,
None,
)
});
asb.listen_on_random_memory_address().await; // this adds an external address
tokio::spawn(async move {
loop {
rendezvous_node.next().await;
}
});
let asb_registered = tokio::spawn(async move {
loop {
if let SwarmEvent::Behaviour(libp2p::rendezvous::Event::Registered { .. }) =
asb.select_next_some().await
{
break;
}
}
});
tokio::time::timeout(Duration::from_secs(10), asb_registered)
.await
.unwrap()
.unwrap();
}
#[tokio::test]
async fn asb_automatically_re_registers() {
let min_ttl = 5;
let mut rendezvous_node = new_swarm(|_, identity| {
libp2p::rendezvous::Rendezvous::new(
identity,
libp2p::rendezvous::Config::default().with_min_ttl(min_ttl),
)
});
let rendezvous_address = rendezvous_node.listen_on_random_memory_address().await;
let mut asb = new_swarm(|_, identity| {
rendezous::Behaviour::new(
identity,
*rendezvous_node.local_peer_id(),
rendezvous_address,
XmrBtcNamespace::Testnet,
Some(5),
)
});
asb.listen_on_random_memory_address().await; // this adds an external address
tokio::spawn(async move {
loop {
rendezvous_node.next().await;
}
});
let asb_registered_three_times = tokio::spawn(async move {
let mut number_of_registrations = 0;
loop {
if let SwarmEvent::Behaviour(libp2p::rendezvous::Event::Registered { .. }) =
asb.select_next_some().await
{
number_of_registrations += 1
}
if number_of_registrations == 3 {
break;
}
}
});
tokio::time::timeout(Duration::from_secs(30), asb_registered_three_times)
.await
.unwrap()
.unwrap();
}
}
}

View File

@ -16,6 +16,7 @@ use anyhow::{bail, Context, Result};
use comfy_table::Table;
use libp2p::core::multiaddr::Protocol;
use libp2p::core::Multiaddr;
use libp2p::swarm::AddressScore;
use libp2p::Swarm;
use std::env;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
@ -29,6 +30,7 @@ use swap::asb::config::{
use swap::asb::{cancel, punish, redeem, refund, safely_abort, EventLoop, Finality, KrakenRate};
use swap::database::Database;
use swap::monero::Amount;
use swap::network::rendezvous::XmrBtcNamespace;
use swap::network::swarm;
use swap::protocol::alice::run;
use swap::seed::Seed;
@ -121,7 +123,7 @@ async fn main() -> Result<()> {
info!(%monero_balance, "Initialized Monero wallet");
}
let kraken_price_updates = kraken::connect(config.maker.price_ticker_ws_url)?;
let kraken_price_updates = kraken::connect(config.maker.price_ticker_ws_url.clone())?;
// setup Tor hidden services
let tor_client =
@ -148,15 +150,33 @@ async fn main() -> Result<()> {
kraken_rate.clone(),
resume_only,
env_config,
config.network.rendezvous_point.map(|rendezvous_point| {
(
rendezvous_point,
if testnet {
XmrBtcNamespace::Testnet
} else {
XmrBtcNamespace::Mainnet
},
)
}),
)?;
for listen in config.network.listen {
for listen in config.network.listen.clone() {
Swarm::listen_on(&mut swarm, listen.clone())
.with_context(|| format!("Failed to listen on network interface {}", listen))?;
}
tracing::info!(peer_id = %swarm.local_peer_id(), "Network layer initialized");
for external_address in config.network.external_addresses {
let _ = Swarm::add_external_address(
&mut swarm,
external_address,
AddressScore::Infinite,
);
}
let (event_loop, mut swap_receiver) = EventLoop::new(
swarm,
env_config,

View File

@ -1,9 +1,11 @@
use crate::asb::LatestRate;
use crate::libp2p_ext::MultiAddrExt;
use crate::network::rendezvous::XmrBtcNamespace;
use crate::seed::Seed;
use crate::{asb, bitcoin, cli, env, tor};
use anyhow::Result;
use anyhow::{Context, Result};
use libp2p::swarm::{NetworkBehaviour, SwarmBuilder};
use libp2p::{identity, Swarm};
use libp2p::{identity, Multiaddr, Swarm};
use std::fmt::Debug;
#[allow(clippy::too_many_arguments)]
@ -14,13 +16,32 @@ pub fn asb<LR>(
latest_rate: LR,
resume_only: bool,
env_config: env::Config,
rendezvous_params: Option<(Multiaddr, XmrBtcNamespace)>,
) -> Result<Swarm<asb::Behaviour<LR>>>
where
LR: LatestRate + Send + 'static + Debug + Clone,
{
let behaviour = asb::Behaviour::new(min_buy, max_buy, latest_rate, resume_only, env_config);
let identity = seed.derive_libp2p_identity();
let rendezvous_params = if let Some((address, namespace)) = rendezvous_params {
let peer_id = address
.extract_peer_id()
.context("Rendezvous node address must contain peer ID")?;
Some((identity.clone(), peer_id, address, namespace))
} else {
None
};
let behaviour = asb::Behaviour::new(
min_buy,
max_buy,
latest_rate,
resume_only,
env_config,
rendezvous_params,
);
let transport = asb::transport::new(&identity)?;
let peer_id = identity.public().into_peer_id();

View File

@ -236,6 +236,7 @@ async fn start_alice(
latest_rate,
resume_only,
env_config,
None,
)
.unwrap();
swarm.listen_on(listen_address).unwrap();