1417: feat(asb): allow asb to register with mulitple rendezvous nodes r=binarybaron a=delta1

implements #633  

This PR updates the ASB to be able to register with multiple rendezvous nodes. A unit test is added that checks this behaviour. I also tested it manually overnight and it repeatedly reregistered with the specified nodes.  

The config file option is left as `rendezvous_point` for backwards compatibility, but now uses the same deserialization as introduced in #1231 so that multiple addresses can be specified in a comma separated string, or as a toml array of strings.

Co-authored-by: Byron Hambly <bhambly@blockstream.com>
This commit is contained in:
bors[bot] 2023-08-04 13:57:35 +00:00 committed by GitHub
commit 9c811f87cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 300 additions and 171 deletions

View File

@ -9,7 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed ### Changed
- Minimum Supported Rust Version (MSRV) bumped to 1.63 - Minimum Supported Rust Version (MSRV) bumped to 1.67
- ASB can now register with multiple rendezvous nodes. The `rendezvous_point` option in `config.toml` can be a string with comma separated addresses, or a toml array of address strings.
## [0.12.1] - 2023-01-09 ## [0.12.1] - 2023-01-09

View File

@ -42,13 +42,16 @@ Since the ASB is a long running task we specify the person running an ASB as ser
The ASB daemon supports the libp2p [rendezvous-protocol](https://github.com/libp2p/specs/tree/master/rendezvous). The ASB daemon supports the libp2p [rendezvous-protocol](https://github.com/libp2p/specs/tree/master/rendezvous).
Usage of the rendezvous functionality is entirely optional. Usage of the rendezvous functionality is entirely optional.
You can configure a rendezvous point in the `[network]` section of your config file. You can configure one or more rendezvous point in the `[network]` section of your config file.
For the registration to be successful, you also need to configure the externally reachable addresses within the `[network]` section. For the registration to be successful, you also need to configure the externally reachable addresses within the `[network]` section.
For example: For example:
```toml ```toml
[network] [network]
rendezvous_point = "/dns4/discover.unstoppableswap.net/tcp/8888/p2p/12D3KooWA6cnqJpVnreBVnoro8midDL9Lpzmg8oJPoAGi7YYaamE" rendezvous_point = [
"/dns4/discover.unstoppableswap.net/tcp/8888/p2p/12D3KooWA6cnqJpVnreBVnoro8midDL9Lpzmg8oJPoAGi7YYaamE",
"/dns4/eratosthen.es/tcp/7798/p2p/12D3KooWAh7EXXa2ZyegzLGdjvj1W4G3EXrTGrf6trraoT1MEobs",
]
external_addresses = ["/dns4/example.com/tcp/9939"] external_addresses = ["/dns4/example.com/tcp/9939"]
``` ```

View File

@ -8,6 +8,7 @@ pub mod tracing;
pub use event_loop::{EventLoop, EventLoopHandle, FixedRate, KrakenRate, LatestRate}; pub use event_loop::{EventLoop, EventLoopHandle, FixedRate, KrakenRate, LatestRate};
pub use network::behaviour::{Behaviour, OutEvent}; pub use network::behaviour::{Behaviour, OutEvent};
pub use network::rendezvous::RendezvousNode;
pub use network::transport; pub use network::transport;
pub use rate::Rate; pub use rate::Rate;
pub use recovery::cancel::cancel; pub use recovery::cancel::cancel;
@ -18,4 +19,4 @@ pub use recovery::safely_abort::safely_abort;
pub use recovery::{cancel, refund}; pub use recovery::{cancel, refund};
#[cfg(test)] #[cfg(test)]
pub use network::rendezous; pub use network::rendezvous;

View File

@ -134,8 +134,8 @@ pub struct Data {
pub struct Network { pub struct Network {
#[serde(deserialize_with = "addr_list::deserialize")] #[serde(deserialize_with = "addr_list::deserialize")]
pub listen: Vec<Multiaddr>, pub listen: Vec<Multiaddr>,
#[serde(default)] #[serde(default, deserialize_with = "addr_list::deserialize")]
pub rendezvous_point: Option<Multiaddr>, pub rendezvous_point: Vec<Multiaddr>,
#[serde(default, deserialize_with = "addr_list::deserialize")] #[serde(default, deserialize_with = "addr_list::deserialize")]
pub external_addresses: Vec<Multiaddr>, pub external_addresses: Vec<Multiaddr>,
} }
@ -156,7 +156,7 @@ mod addr_list {
let list: Result<Vec<_>, _> = s let list: Result<Vec<_>, _> = s
.split(',') .split(',')
.filter(|s| !s.is_empty()) .filter(|s| !s.is_empty())
.map(|s| s.parse().map_err(de::Error::custom)) .map(|s| s.trim().parse().map_err(de::Error::custom))
.collect(); .collect();
Ok(list?) Ok(list?)
} }
@ -165,7 +165,7 @@ mod addr_list {
.iter() .iter()
.map(|v| { .map(|v| {
if let Value::String(s) = v { if let Value::String(s) = v {
s.parse().map_err(de::Error::custom) s.trim().parse().map_err(de::Error::custom)
} else { } else {
Err(de::Error::custom("expected a string")) Err(de::Error::custom("expected a string"))
} }
@ -347,10 +347,27 @@ pub fn query_user_for_initial_config(testnet: bool) -> Result<Config> {
} }
let ask_spread = Decimal::from_f64(ask_spread).context("Unable to parse spread")?; let ask_spread = Decimal::from_f64(ask_spread).context("Unable to parse spread")?;
let rendezvous_point = Input::<Multiaddr>::with_theme(&ColorfulTheme::default()) let mut number = 1;
.with_prompt("Do you want to advertise your ASB instance with a rendezvous node? Enter an empty string if not.") let mut done = false;
.allow_empty(true) let mut rendezvous_points = Vec::new();
.interact_text()?; println!("ASB can register with multiple rendezvous nodes for discoverability. This can also be edited in the config file later.");
while !done {
let prompt = format!(
"Enter the address for rendezvous node ({number}). Or just hit Enter to continue."
);
let rendezvous_addr = Input::<Multiaddr>::with_theme(&ColorfulTheme::default())
.with_prompt(prompt)
.allow_empty(true)
.interact_text()?;
if rendezvous_addr.is_empty() {
done = true;
} else if rendezvous_points.contains(&rendezvous_addr) {
println!("That rendezvous address is already in the list.");
} else {
rendezvous_points.push(rendezvous_addr);
number += 1;
}
}
println!(); println!();
@ -358,11 +375,7 @@ pub fn query_user_for_initial_config(testnet: bool) -> Result<Config> {
data: Data { dir: data_dir }, data: Data { dir: data_dir },
network: Network { network: Network {
listen: listen_addresses, listen: listen_addresses,
rendezvous_point: if rendezvous_point.is_empty() { rendezvous_point: rendezvous_points, // keeping the singular key name for backcompat
None
} else {
Some(rendezvous_point)
},
external_addresses: vec![], external_addresses: vec![],
}, },
bitcoin: Bitcoin { bitcoin: Bitcoin {
@ -417,7 +430,7 @@ mod tests {
}, },
network: Network { network: Network {
listen: vec![defaults.listen_address_tcp, defaults.listen_address_ws], listen: vec![defaults.listen_address_tcp, defaults.listen_address_ws],
rendezvous_point: None, rendezvous_point: vec![],
external_addresses: vec![], external_addresses: vec![],
}, },
monero: Monero { monero: Monero {
@ -461,7 +474,7 @@ mod tests {
}, },
network: Network { network: Network {
listen: vec![defaults.listen_address_tcp, defaults.listen_address_ws], listen: vec![defaults.listen_address_tcp, defaults.listen_address_ws],
rendezvous_point: None, rendezvous_point: vec![],
external_addresses: vec![], external_addresses: vec![],
}, },
monero: Monero { monero: Monero {
@ -515,7 +528,7 @@ mod tests {
}, },
network: Network { network: Network {
listen, listen,
rendezvous_point: None, rendezvous_point: vec![],
external_addresses, external_addresses,
}, },
monero: Monero { monero: Monero {

View File

@ -253,8 +253,8 @@ where
channel channel
}.boxed()); }.boxed());
} }
SwarmEvent::Behaviour(OutEvent::Rendezvous(libp2p::rendezvous::client::Event::Registered { .. })) => { SwarmEvent::Behaviour(OutEvent::Rendezvous(libp2p::rendezvous::client::Event::Registered { rendezvous_node, ttl, namespace })) => {
tracing::info!("Successfully registered with rendezvous node"); tracing::info!("Successfully registered with rendezvous node: {} with namespace: {} and TTL: {:?}", rendezvous_node, namespace, ttl);
} }
SwarmEvent::Behaviour(OutEvent::Rendezvous(libp2p::rendezvous::client::Event::RegisterFailed(error))) => { SwarmEvent::Behaviour(OutEvent::Rendezvous(libp2p::rendezvous::client::Event::RegisterFailed(error))) => {
tracing::error!("Registration with rendezvous node failed: {:?}", error); tracing::error!("Registration with rendezvous node failed: {:?}", error);

View File

@ -44,7 +44,9 @@ pub mod transport {
} }
pub mod behaviour { pub mod behaviour {
use super::*; use libp2p::swarm::behaviour::toggle::Toggle;
use super::{rendezvous::RendezvousNode, *};
#[allow(clippy::large_enum_variant)] #[allow(clippy::large_enum_variant)]
#[derive(Debug)] #[derive(Debug)]
@ -108,7 +110,7 @@ pub mod behaviour {
where where
LR: LatestRate + Send + 'static, LR: LatestRate + Send + 'static,
{ {
pub rendezvous: libp2p::swarm::behaviour::toggle::Toggle<rendezous::Behaviour>, pub rendezvous: Toggle<rendezvous::Behaviour>,
pub quote: quote::Behaviour, pub quote: quote::Behaviour,
pub swap_setup: alice::Behaviour<LR>, pub swap_setup: alice::Behaviour<LR>,
pub transfer_proof: transfer_proof::Behaviour, pub transfer_proof: transfer_proof::Behaviour,
@ -132,25 +134,22 @@ pub mod behaviour {
resume_only: bool, resume_only: bool,
env_config: env::Config, env_config: env::Config,
identify_params: (identity::Keypair, XmrBtcNamespace), identify_params: (identity::Keypair, XmrBtcNamespace),
rendezvous_params: Option<(identity::Keypair, PeerId, Multiaddr, XmrBtcNamespace)>, rendezvous_nodes: Vec<RendezvousNode>,
) -> Self { ) -> Self {
let agentVersion = format!("asb/{} ({})", env!("CARGO_PKG_VERSION"), identify_params.1); let (identity, namespace) = identify_params;
let protocolVersion = "/comit/xmr/btc/1.0.0".to_string(); let agent_version = format!("asb/{} ({})", env!("CARGO_PKG_VERSION"), namespace);
let identifyConfig = IdentifyConfig::new(protocolVersion, identify_params.0.public()) let protocol_version = "/comit/xmr/btc/1.0.0".to_string();
.with_agent_version(agentVersion); let identifyConfig = IdentifyConfig::new(protocol_version, identity.public())
.with_agent_version(agent_version);
let behaviour = if rendezvous_nodes.is_empty() {
None
} else {
Some(rendezvous::Behaviour::new(identity, rendezvous_nodes))
};
Self { Self {
rendezvous: libp2p::swarm::behaviour::toggle::Toggle::from(rendezvous_params.map( rendezvous: Toggle::from(behaviour),
|(identity, rendezvous_peer_id, rendezvous_address, namespace)| {
rendezous::Behaviour::new(
identity,
rendezvous_peer_id,
rendezvous_address,
namespace,
None, // use default ttl on rendezvous point
)
},
)),
quote: quote::asb(), quote: quote::asb(),
swap_setup: alice::Behaviour::new( swap_setup: alice::Behaviour::new(
min_buy, min_buy,
@ -186,13 +185,14 @@ pub mod behaviour {
} }
} }
pub mod rendezous { pub mod rendezvous {
use super::*; use super::*;
use libp2p::swarm::dial_opts::DialOpts; use libp2p::swarm::dial_opts::DialOpts;
use libp2p::swarm::DialError; use libp2p::swarm::DialError;
use std::collections::VecDeque;
use std::pin::Pin; use std::pin::Pin;
#[derive(PartialEq)] #[derive(Clone, PartialEq)]
enum ConnectionStatus { enum ConnectionStatus {
Disconnected, Disconnected,
Dialling, Dialling,
@ -209,39 +209,59 @@ pub mod rendezous {
pub struct Behaviour { pub struct Behaviour {
inner: libp2p::rendezvous::client::Behaviour, inner: libp2p::rendezvous::client::Behaviour,
rendezvous_point: Multiaddr, rendezvous_nodes: Vec<RendezvousNode>,
rendezvous_peer_id: PeerId, to_dial: VecDeque<PeerId>,
namespace: XmrBtcNamespace,
registration_status: RegistrationStatus,
connection_status: ConnectionStatus,
registration_ttl: Option<u64>,
} }
impl Behaviour { pub struct RendezvousNode {
pub address: Multiaddr,
connection_status: ConnectionStatus,
pub peer_id: PeerId,
registration_status: RegistrationStatus,
pub registration_ttl: Option<u64>,
pub namespace: XmrBtcNamespace,
}
impl RendezvousNode {
pub fn new( pub fn new(
identity: identity::Keypair, address: &Multiaddr,
rendezvous_peer_id: PeerId, peer_id: PeerId,
rendezvous_address: Multiaddr,
namespace: XmrBtcNamespace, namespace: XmrBtcNamespace,
registration_ttl: Option<u64>, registration_ttl: Option<u64>,
) -> Self { ) -> Self {
Self { Self {
inner: libp2p::rendezvous::client::Behaviour::new(identity), address: address.to_owned(),
rendezvous_point: rendezvous_address,
rendezvous_peer_id,
namespace,
registration_status: RegistrationStatus::RegisterOnNextConnection,
connection_status: ConnectionStatus::Disconnected, connection_status: ConnectionStatus::Disconnected,
namespace,
peer_id,
registration_status: RegistrationStatus::RegisterOnNextConnection,
registration_ttl, registration_ttl,
} }
} }
fn register(&mut self) { fn set_connection(&mut self, status: ConnectionStatus) {
self.inner.register( self.connection_status = status;
self.namespace.into(), }
self.rendezvous_peer_id,
self.registration_ttl, fn set_registration(&mut self, status: RegistrationStatus) {
); self.registration_status = status;
}
}
impl Behaviour {
pub fn new(identity: identity::Keypair, rendezvous_nodes: Vec<RendezvousNode>) -> Self {
Self {
inner: libp2p::rendezvous::client::Behaviour::new(identity),
rendezvous_nodes,
to_dial: VecDeque::new(),
}
}
/// Calls the rendezvous register method of the node at node_index in the Vec of rendezvous nodes
fn register(&mut self, node_index: usize) {
let node = &self.rendezvous_nodes[node_index];
self.inner
.register(node.namespace.into(), node.peer_id, node.registration_ttl);
} }
} }
@ -255,31 +275,37 @@ pub mod rendezous {
} }
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> { fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
if peer_id == &self.rendezvous_peer_id { for node in self.rendezvous_nodes.iter() {
return vec![self.rendezvous_point.clone()]; if peer_id == &node.peer_id {
return vec![node.address.clone()];
}
} }
vec![] vec![]
} }
fn inject_connected(&mut self, peer_id: &PeerId) { fn inject_connected(&mut self, peer_id: &PeerId) {
if peer_id == &self.rendezvous_peer_id { for i in 0..self.rendezvous_nodes.len() {
self.connection_status = ConnectionStatus::Connected; if peer_id == &self.rendezvous_nodes[i].peer_id {
self.rendezvous_nodes[i].set_connection(ConnectionStatus::Connected);
match &self.registration_status { match &self.rendezvous_nodes[i].registration_status {
RegistrationStatus::RegisterOnNextConnection => { RegistrationStatus::RegisterOnNextConnection => {
self.register(); self.register(i);
self.registration_status = RegistrationStatus::Pending; self.rendezvous_nodes[i].set_registration(RegistrationStatus::Pending);
}
RegistrationStatus::Registered { .. } => {}
RegistrationStatus::Pending => {}
} }
RegistrationStatus::Registered { .. } => {}
RegistrationStatus::Pending => {}
} }
} }
} }
fn inject_disconnected(&mut self, peer_id: &PeerId) { fn inject_disconnected(&mut self, peer_id: &PeerId) {
if peer_id == &self.rendezvous_peer_id { for i in 0..self.rendezvous_nodes.len() {
self.connection_status = ConnectionStatus::Disconnected; let mut node = &mut self.rendezvous_nodes[i];
if peer_id == &node.peer_id {
node.connection_status = ConnectionStatus::Disconnected;
}
} }
} }
@ -298,9 +324,12 @@ pub mod rendezous {
_handler: Self::ProtocolsHandler, _handler: Self::ProtocolsHandler,
_error: &DialError, _error: &DialError,
) { ) {
if let Some(id) = peer_id { for i in 0..self.rendezvous_nodes.len() {
if id == self.rendezvous_peer_id { let mut node = &mut self.rendezvous_nodes[i];
self.connection_status = ConnectionStatus::Disconnected; if let Some(id) = peer_id {
if id == node.peer_id {
node.connection_status = ConnectionStatus::Disconnected;
}
} }
} }
} }
@ -311,62 +340,73 @@ pub mod rendezous {
cx: &mut std::task::Context<'_>, cx: &mut std::task::Context<'_>,
params: &mut impl PollParameters, params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> { ) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
match &mut self.registration_status { if let Some(peer_id) = self.to_dial.pop_front() {
RegistrationStatus::RegisterOnNextConnection => match self.connection_status { return Poll::Ready(NetworkBehaviourAction::Dial {
ConnectionStatus::Disconnected => { opts: DialOpts::peer_id(peer_id)
self.connection_status = ConnectionStatus::Dialling; .condition(PeerCondition::Disconnected)
.build(),
return Poll::Ready(NetworkBehaviourAction::Dial { handler: Self::ProtocolsHandler::new(Duration::from_secs(30)),
opts: DialOpts::peer_id(self.rendezvous_peer_id) });
.condition(PeerCondition::Disconnected) }
.build(), // check the status of each rendezvous node
for i in 0..self.rendezvous_nodes.len() {
handler: Self::ProtocolsHandler::new(Duration::from_secs(30)), let connection_status = self.rendezvous_nodes[i].connection_status.clone();
}); match &mut self.rendezvous_nodes[i].registration_status {
} RegistrationStatus::RegisterOnNextConnection => match connection_status {
ConnectionStatus::Dialling => {} ConnectionStatus::Disconnected => {
ConnectionStatus::Connected => { self.rendezvous_nodes[i].set_connection(ConnectionStatus::Dialling);
self.registration_status = RegistrationStatus::Pending; self.to_dial.push_back(self.rendezvous_nodes[i].peer_id);
self.register(); }
} ConnectionStatus::Dialling => {}
}, ConnectionStatus::Connected => {
RegistrationStatus::Registered { re_register_in } => { self.rendezvous_nodes[i].set_registration(RegistrationStatus::Pending);
if let Poll::Ready(()) = re_register_in.poll_unpin(cx) { self.register(i);
match self.connection_status { }
ConnectionStatus::Connected => { },
self.registration_status = RegistrationStatus::Pending; RegistrationStatus::Registered { re_register_in } => {
self.register(); if let Poll::Ready(()) = re_register_in.poll_unpin(cx) {
match connection_status {
ConnectionStatus::Connected => {
self.rendezvous_nodes[i]
.set_registration(RegistrationStatus::Pending);
self.register(i);
}
ConnectionStatus::Disconnected => {
self.rendezvous_nodes[i].set_registration(
RegistrationStatus::RegisterOnNextConnection,
);
self.to_dial.push_back(self.rendezvous_nodes[i].peer_id);
}
ConnectionStatus::Dialling => {}
} }
ConnectionStatus::Disconnected => {
self.registration_status =
RegistrationStatus::RegisterOnNextConnection;
return Poll::Ready(NetworkBehaviourAction::Dial {
opts: DialOpts::peer_id(self.rendezvous_peer_id)
.condition(PeerCondition::Disconnected)
.build(),
handler: Self::ProtocolsHandler::new(Duration::from_secs(30)),
});
}
ConnectionStatus::Dialling => {}
} }
} }
RegistrationStatus::Pending => {}
} }
RegistrationStatus::Pending => {}
} }
let inner_poll = self.inner.poll(cx, params); let inner_poll = self.inner.poll(cx, params);
// reset the timer if we successfully registered // reset the timer for the specific rendezvous node if we successfully registered
if let Poll::Ready(NetworkBehaviourAction::GenerateEvent( if let Poll::Ready(NetworkBehaviourAction::GenerateEvent(
libp2p::rendezvous::client::Event::Registered { ttl, .. }, libp2p::rendezvous::client::Event::Registered {
ttl,
rendezvous_node,
..
},
)) = &inner_poll )) = &inner_poll
{ {
let half_of_ttl = Duration::from_secs(*ttl) / 2; if let Some(i) = self
.rendezvous_nodes
self.registration_status = RegistrationStatus::Registered { .iter()
re_register_in: Box::pin(tokio::time::sleep(half_of_ttl)), .position(|n| &n.peer_id == rendezvous_node)
}; {
let half_of_ttl = Duration::from_secs(*ttl) / 2;
let re_register_in = Box::pin(tokio::time::sleep(half_of_ttl));
let status = RegistrationStatus::Registered { re_register_in };
self.rendezvous_nodes[i].set_registration(status);
}
} }
inner_poll inner_poll
@ -380,6 +420,7 @@ pub mod rendezous {
use futures::StreamExt; use futures::StreamExt;
use libp2p::rendezvous; use libp2p::rendezvous;
use libp2p::swarm::SwarmEvent; use libp2p::swarm::SwarmEvent;
use std::collections::HashMap;
#[tokio::test] #[tokio::test]
async fn given_no_initial_connection_when_constructed_asb_connects_and_registers_with_rendezvous_node( async fn given_no_initial_connection_when_constructed_asb_connects_and_registers_with_rendezvous_node(
@ -387,16 +428,16 @@ pub mod rendezous {
let mut rendezvous_node = new_swarm(|_, _| { let mut rendezvous_node = new_swarm(|_, _| {
rendezvous::server::Behaviour::new(rendezvous::server::Config::default()) rendezvous::server::Behaviour::new(rendezvous::server::Config::default())
}); });
let rendezvous_address = rendezvous_node.listen_on_random_memory_address().await; let address = rendezvous_node.listen_on_random_memory_address().await;
let rendezvous_point = RendezvousNode::new(
&address,
rendezvous_node.local_peer_id().to_owned(),
XmrBtcNamespace::Testnet,
None,
);
let mut asb = new_swarm(|_, identity| { let mut asb = new_swarm(|_, identity| {
rendezous::Behaviour::new( super::rendezvous::Behaviour::new(identity, vec![rendezvous_point])
identity,
*rendezvous_node.local_peer_id(),
rendezvous_address,
XmrBtcNamespace::Testnet,
None,
)
}); });
asb.listen_on_random_memory_address().await; // this adds an external address asb.listen_on_random_memory_address().await; // this adds an external address
@ -428,16 +469,16 @@ pub mod rendezous {
rendezvous::server::Config::default().with_min_ttl(2), rendezvous::server::Config::default().with_min_ttl(2),
) )
}); });
let rendezvous_address = rendezvous_node.listen_on_random_memory_address().await; let address = rendezvous_node.listen_on_random_memory_address().await;
let rendezvous_point = RendezvousNode::new(
&address,
rendezvous_node.local_peer_id().to_owned(),
XmrBtcNamespace::Testnet,
Some(5),
);
let mut asb = new_swarm(|_, identity| { let mut asb = new_swarm(|_, identity| {
rendezous::Behaviour::new( super::rendezvous::Behaviour::new(identity, vec![rendezvous_point])
identity,
*rendezvous_node.local_peer_id(),
rendezvous_address,
XmrBtcNamespace::Testnet,
Some(5),
)
}); });
asb.listen_on_random_memory_address().await; // this adds an external address asb.listen_on_random_memory_address().await; // this adds an external address
@ -467,5 +508,62 @@ pub mod rendezous {
.unwrap() .unwrap()
.unwrap(); .unwrap();
} }
#[tokio::test]
async fn asb_registers_multiple() {
let registration_ttl = Some(10);
let mut rendezvous_nodes = Vec::new();
let mut registrations = HashMap::new();
// register with 5 rendezvous nodes
for _ in 0..5 {
let mut rendezvous = new_swarm(|_, _| {
rendezvous::server::Behaviour::new(
rendezvous::server::Config::default().with_min_ttl(2),
)
});
let address = rendezvous.listen_on_random_memory_address().await;
let id = *rendezvous.local_peer_id();
registrations.insert(id, 0);
rendezvous_nodes.push(RendezvousNode::new(
&address,
*rendezvous.local_peer_id(),
XmrBtcNamespace::Testnet,
registration_ttl,
));
tokio::spawn(async move {
loop {
rendezvous.next().await;
}
});
}
let mut asb = new_swarm(|_, identity| {
super::rendezvous::Behaviour::new(identity, rendezvous_nodes)
});
asb.listen_on_random_memory_address().await; // this adds an external address
let handle = tokio::spawn(async move {
loop {
if let SwarmEvent::Behaviour(rendezvous::client::Event::Registered {
rendezvous_node,
..
}) = asb.select_next_some().await
{
registrations
.entry(rendezvous_node)
.and_modify(|counter| *counter += 1);
}
if registrations.iter().all(|(_, &count)| count >= 4) {
break;
}
}
});
tokio::time::timeout(Duration::from_secs(30), handle)
.await
.unwrap()
.unwrap();
}
} }
} }

View File

@ -102,6 +102,19 @@ async fn main() -> Result<()> {
match cmd { match cmd {
Command::Start { resume_only } => { Command::Start { resume_only } => {
// check and warn for duplicate rendezvous points
let mut rendezvous_addrs = config.network.rendezvous_point.clone();
let prev_len = rendezvous_addrs.len();
rendezvous_addrs.sort();
rendezvous_addrs.dedup();
let new_len = rendezvous_addrs.len();
if new_len < prev_len {
tracing::warn!(
"`rendezvous_point` config has {} duplicate entries, they are being ignored.",
prev_len - new_len
);
}
let monero_wallet = init_monero_wallet(&config, env_config).await?; let monero_wallet = init_monero_wallet(&config, env_config).await?;
let monero_address = monero_wallet.get_main_address(); let monero_address = monero_wallet.get_main_address();
tracing::info!(%monero_address, "Monero wallet address"); tracing::info!(%monero_address, "Monero wallet address");
@ -161,7 +174,7 @@ async fn main() -> Result<()> {
resume_only, resume_only,
env_config, env_config,
namespace, namespace,
config.network.rendezvous_point, &rendezvous_addrs,
)?; )?;
for listen in config.network.listen.clone() { for listen in config.network.listen.clone() {

View File

@ -15,6 +15,7 @@ pub use list_sellers::{list_sellers, Seller, Status as SellerStatus};
mod tests { mod tests {
use super::*; use super::*;
use crate::asb; use crate::asb;
use crate::asb::rendezvous::RendezvousNode;
use crate::cli::list_sellers::{Seller, Status}; use crate::cli::list_sellers::{Seller, Status};
use crate::network::quote; use crate::network::quote;
use crate::network::quote::BidQuote; use crate::network::quote::BidQuote;
@ -33,10 +34,8 @@ mod tests {
async fn list_sellers_should_report_all_registered_asbs_with_a_quote() { async fn list_sellers_should_report_all_registered_asbs_with_a_quote() {
let namespace = XmrBtcNamespace::Mainnet; let namespace = XmrBtcNamespace::Mainnet;
let (rendezvous_address, rendezvous_peer_id) = setup_rendezvous_point().await; let (rendezvous_address, rendezvous_peer_id) = setup_rendezvous_point().await;
let expected_seller_1 = let expected_seller_1 = setup_asb(rendezvous_peer_id, &rendezvous_address, namespace).await;
setup_asb(rendezvous_peer_id, rendezvous_address.clone(), namespace).await; let expected_seller_2 = setup_asb(rendezvous_peer_id, &rendezvous_address, namespace).await;
let expected_seller_2 =
setup_asb(rendezvous_peer_id, rendezvous_address.clone(), namespace).await;
let list_sellers = list_sellers( let list_sellers = list_sellers(
rendezvous_peer_id, rendezvous_peer_id,
@ -72,7 +71,7 @@ mod tests {
async fn setup_asb( async fn setup_asb(
rendezvous_peer_id: PeerId, rendezvous_peer_id: PeerId,
rendezvous_address: Multiaddr, rendezvous_address: &Multiaddr,
namespace: XmrBtcNamespace, namespace: XmrBtcNamespace,
) -> Seller { ) -> Seller {
let static_quote = BidQuote { let static_quote = BidQuote {
@ -81,18 +80,18 @@ mod tests {
max_quantity: bitcoin::Amount::from_sat(9001), max_quantity: bitcoin::Amount::from_sat(9001),
}; };
let mut asb = new_swarm(|_, identity| StaticQuoteAsbBehaviour { let mut asb = new_swarm(|_, identity| {
rendezvous: asb::rendezous::Behaviour::new( let rendezvous_node =
identity, RendezvousNode::new(rendezvous_address, rendezvous_peer_id, namespace, None);
rendezvous_peer_id, let rendezvous = asb::rendezvous::Behaviour::new(identity, vec![rendezvous_node]);
rendezvous_address,
namespace, StaticQuoteAsbBehaviour {
None, rendezvous,
), ping: Default::default(),
ping: Default::default(), quote: quote::asb(),
quote: quote::asb(), static_quote,
static_quote, registered: false,
registered: false, }
}); });
let asb_address = asb.listen_on_tcp_localhost().await; let asb_address = asb.listen_on_tcp_localhost().await;
@ -121,7 +120,7 @@ mod tests {
#[derive(libp2p::NetworkBehaviour)] #[derive(libp2p::NetworkBehaviour)]
#[behaviour(event_process = true)] #[behaviour(event_process = true)]
struct StaticQuoteAsbBehaviour { struct StaticQuoteAsbBehaviour {
rendezvous: asb::rendezous::Behaviour, rendezvous: asb::rendezvous::Behaviour,
// Support `Ping` as a workaround until https://github.com/libp2p/rust-libp2p/issues/2109 is fixed. // Support `Ping` as a workaround until https://github.com/libp2p/rust-libp2p/issues/2109 is fixed.
ping: libp2p::ping::Ping, ping: libp2p::ping::Ping,
quote: quote::Behaviour, quote: quote::Behaviour,

View File

@ -1,9 +1,9 @@
use crate::asb::LatestRate; use crate::asb::{LatestRate, RendezvousNode};
use crate::libp2p_ext::MultiAddrExt; use crate::libp2p_ext::MultiAddrExt;
use crate::network::rendezvous::XmrBtcNamespace; use crate::network::rendezvous::XmrBtcNamespace;
use crate::seed::Seed; use crate::seed::Seed;
use crate::{asb, bitcoin, cli, env, tor}; use crate::{asb, bitcoin, cli, env, tor};
use anyhow::{Context, Result}; use anyhow::Result;
use libp2p::swarm::{NetworkBehaviour, SwarmBuilder}; use libp2p::swarm::{NetworkBehaviour, SwarmBuilder};
use libp2p::{identity, Multiaddr, Swarm}; use libp2p::{identity, Multiaddr, Swarm};
use std::fmt::Debug; use std::fmt::Debug;
@ -17,22 +17,23 @@ pub fn asb<LR>(
resume_only: bool, resume_only: bool,
env_config: env::Config, env_config: env::Config,
namespace: XmrBtcNamespace, namespace: XmrBtcNamespace,
rendezvous_point: Option<Multiaddr>, rendezvous_addrs: &[Multiaddr],
) -> Result<Swarm<asb::Behaviour<LR>>> ) -> Result<Swarm<asb::Behaviour<LR>>>
where where
LR: LatestRate + Send + 'static + Debug + Clone, LR: LatestRate + Send + 'static + Debug + Clone,
{ {
let identity = seed.derive_libp2p_identity(); let identity = seed.derive_libp2p_identity();
let rendezvous_params = if let Some(address) = rendezvous_point { let rendezvous_nodes = rendezvous_addrs
let peer_id = address .iter()
.extract_peer_id() .map(|addr| {
.context("Rendezvous node address must contain peer ID")?; let peer_id = addr
.extract_peer_id()
.expect("Rendezvous node address must contain peer ID");
Some((identity.clone(), peer_id, address, namespace)) RendezvousNode::new(addr, peer_id, namespace, None)
} else { })
None .collect();
};
let behaviour = asb::Behaviour::new( let behaviour = asb::Behaviour::new(
min_buy, min_buy,
@ -41,7 +42,7 @@ where
resume_only, resume_only,
env_config, env_config,
(identity.clone(), namespace), (identity.clone(), namespace),
rendezvous_params, rendezvous_nodes,
); );
let transport = asb::transport::new(&identity)?; let transport = asb::transport::new(&identity)?;

View File

@ -248,7 +248,7 @@ async fn start_alice(
resume_only, resume_only,
env_config, env_config,
XmrBtcNamespace::Testnet, XmrBtcNamespace::Testnet,
None, &[],
) )
.unwrap(); .unwrap();
swarm.listen_on(listen_address).unwrap(); swarm.listen_on(listen_address).unwrap();