Extend SwarmExt with ability to listen on TCP localhost

The CLI's transport doesn't support memory addresses and it also shouldn't support those by default. To be able to use it in tests, we extend the `SwarmExt` trait with the ability to listen on local TCP addresses with a random port.
This commit is contained in:
Thomas Eizinger 2021-07-06 12:17:10 +10:00
parent 93a0692998
commit e642f5c148
No known key found for this signature in database
GPG Key ID: 651AC83A6C6C8B96

View File

@ -9,6 +9,7 @@ use libp2p::core::{identity, Executor, Multiaddr, PeerId, Transport};
use libp2p::mplex::MplexConfig; use libp2p::mplex::MplexConfig;
use libp2p::noise::{Keypair, NoiseConfig, X25519Spec}; use libp2p::noise::{Keypair, NoiseConfig, X25519Spec};
use libp2p::swarm::{AddressScore, NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent}; use libp2p::swarm::{AddressScore, NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent};
use libp2p::tcp::TokioTcpConfig;
use libp2p::yamux::YamuxConfig; use libp2p::yamux::YamuxConfig;
use std::fmt::Debug; use std::fmt::Debug;
use std::pin::Pin; use std::pin::Pin;
@ -40,6 +41,7 @@ where
let noise = NoiseConfig::xx(dh_keys).into_authenticated(); let noise = NoiseConfig::xx(dh_keys).into_authenticated();
let transport = MemoryTransport::default() let transport = MemoryTransport::default()
.or_transport(TokioTcpConfig::new())
.upgrade(Version::V1) .upgrade(Version::V1)
.authenticate(noise) .authenticate(noise)
.multiplex(SelectUpgrade::new( .multiplex(SelectUpgrade::new(
@ -57,11 +59,19 @@ where
fn get_rand_memory_address() -> Multiaddr { fn get_rand_memory_address() -> Multiaddr {
let address_port = rand::random::<u64>(); let address_port = rand::random::<u64>();
let addr = format!("/memory/{}", address_port)
.parse::<Multiaddr>()
.unwrap();
addr format!("/memory/{}", address_port).parse().unwrap()
}
async fn get_local_tcp_address() -> Multiaddr {
let random_port = {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
listener.local_addr().unwrap().port()
};
format!("/ip4/127.0.0.1/tcp/{}", random_port)
.parse()
.unwrap()
} }
pub async fn await_events_or_timeout<A, B, E1, E2>( pub async fn await_events_or_timeout<A, B, E1, E2>(
@ -101,6 +111,10 @@ pub trait SwarmExt {
/// Listens on a random memory address, polling the [`Swarm`] until the /// Listens on a random memory address, polling the [`Swarm`] until the
/// transport is ready to accept connections. /// transport is ready to accept connections.
async fn listen_on_random_memory_address(&mut self) -> Multiaddr; async fn listen_on_random_memory_address(&mut self) -> Multiaddr;
/// Listens on a TCP port for localhost only, polling the [`Swarm`] until
/// the transport is ready to accept connections.
async fn listen_on_tcp_localhost(&mut self) -> Multiaddr;
} }
#[async_trait] #[async_trait]
@ -163,21 +177,7 @@ where
let multiaddr = get_rand_memory_address(); let multiaddr = get_rand_memory_address();
self.listen_on(multiaddr.clone()).unwrap(); self.listen_on(multiaddr.clone()).unwrap();
block_until_listening_on(self, &multiaddr).await;
// block until we are actually listening
loop {
match self.select_next_some().await {
SwarmEvent::NewListenAddr(addr) if addr == multiaddr => {
break;
}
other => {
tracing::debug!(
"Ignoring {:?} while waiting for listening to succeed",
other
);
}
}
}
// Memory addresses are externally reachable because they all share the same // Memory addresses are externally reachable because they all share the same
// memory-space. // memory-space.
@ -185,4 +185,33 @@ where
multiaddr multiaddr
} }
async fn listen_on_tcp_localhost(&mut self) -> Multiaddr {
let multiaddr = get_local_tcp_address().await;
self.listen_on(multiaddr.clone()).unwrap();
block_until_listening_on(self, &multiaddr).await;
multiaddr
}
}
async fn block_until_listening_on<B>(swarm: &mut Swarm<B>, multiaddr: &Multiaddr)
where
B: NetworkBehaviour,
<B as NetworkBehaviour>::OutEvent: Debug,
{
loop {
match swarm.select_next_some().await {
SwarmEvent::NewListenAddr(addr) if &addr == multiaddr => {
break;
}
other => {
tracing::debug!(
"Ignoring {:?} while waiting for listening to succeed",
other
);
}
}
}
} }