From d5d7389d25db7120f1073ea1f6ead237fcd47abd Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 18 Jan 2021 15:24:09 +1100 Subject: [PATCH] Initial working version --- Cargo.toml | 2 +- src/lib.rs | 135 +++++++++++++++---------------------------- src/swarm_harness.rs | 89 ++++++++++++++-------------- 3 files changed, 92 insertions(+), 134 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index fb038fb1..2284a676 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ log = "0.4" [dev-dependencies] anyhow = "1" serde_cbor = "0.11" -tokio = { version = "1", features = ["macros", "rt", "time"] } +tokio = { version = "1", features = ["macros", "rt", "time", "rt-multi-thread"] } libp2p = { version = "0.34", default-features = false, features = ["noise", "yamux"] } rand = "0.8" serde = { version = "1", features = ["derive"] } diff --git a/src/lib.rs b/src/lib.rs index 4ec82be3..8aae2c1a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,16 +1,15 @@ use libp2p::swarm::{ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, KeepAlive, SubstreamProtocol, NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction, PollParameters, NotifyHandler}; use libp2p::futures::task::{Context, Poll}; use libp2p::{InboundUpgrade, PeerId, OutboundUpgrade}; -use libp2p::core::{UpgradeInfo, Multiaddr}; +use libp2p::core::{UpgradeInfo, Multiaddr, ConnectedPoint}; use libp2p::futures::{FutureExt}; use std::future::{Ready, Future}; use std::convert::Infallible; use libp2p::futures::future::BoxFuture; -use std::collections::VecDeque; +use std::collections::{VecDeque, HashMap}; use libp2p::swarm::protocols_handler::OutboundUpgradeSend; -use libp2p::core::connection::ConnectionId; +use libp2p::core::connection::{ConnectionId}; use std::iter; -use std::task::Waker; #[cfg(test)] mod swarm_harness; @@ -113,8 +112,6 @@ impl ProtocolsHandler for NMessageHandler ProtocolsHandler for NMessageHandler ProtocolsHandler for NMessageHandler { - log::trace!("got execute inbound event"); - match self.inbound_substream.take() { Some(substream) => { - log::trace!("got inbound substream, upgrading with custom protocol"); - self.inbound_future = Some(protocol_fn(substream)) } None => { @@ -149,14 +140,10 @@ impl ProtocolsHandler for NMessageHandler { - log::trace!("got execute outbound event"); - self.substream_request = Some(SubstreamProtocol::new(NMessageProtocol::new(self.info), ())); match self.outbound_substream.take() { Some(substream) => { - log::trace!("got outbound substream, upgrading with custom protocol"); - self.outbound_future = Some(protocol_fn(substream)); } None => { @@ -183,7 +170,7 @@ impl ProtocolsHandler for NMessageHandler { return Poll::Ready(ProtocolsHandlerEvent::Custom(ProtocolOutEvent::InboundFinished(value))) @@ -191,11 +178,14 @@ impl ProtocolsHandler for NMessageHandler { return Poll::Ready(ProtocolsHandlerEvent::Custom(ProtocolOutEvent::InboundFailed(e))) } - Poll::Pending => {} + Poll::Pending => { + self.inbound_future = Some(future); + return Poll::Pending + } } } - if let Some(future) = self.outbound_future.as_mut() { + if let Some(mut future) = self.outbound_future.take() { match future.poll_unpin(cx) { Poll::Ready(Ok(value)) => { return Poll::Ready(ProtocolsHandlerEvent::Custom(ProtocolOutEvent::OutboundFinished(value))) @@ -203,7 +193,10 @@ impl ProtocolsHandler for NMessageHandler { return Poll::Ready(ProtocolsHandlerEvent::Custom(ProtocolOutEvent::OutboundFailed(e))) } - Poll::Pending => {} + Poll::Pending => { + self.outbound_future = Some(future); + return Poll::Pending + } } } @@ -215,11 +208,9 @@ pub struct NMessageBehaviour { protocol_in_events: VecDeque<(PeerId, ProtocolInEvent)>, protocol_out_events: VecDeque<(PeerId, ProtocolOutEvent)>, - waker: Option, + connected_peers: HashMap>, - connected_peers: Vec, - - info: &'static [u8] + info: &'static [u8], } impl NMessageBehaviour { @@ -234,10 +225,9 @@ impl NMessageBehaviour { /// ``` pub fn new(info: &'static [u8]) -> Self { Self { - protocol_in_events: Default::default(), - protocol_out_events: Default::default(), - waker: None, - connected_peers: vec![], + protocol_in_events: VecDeque::default(), + protocol_out_events: VecDeque::default(), + connected_peers: HashMap::default(), info } } @@ -246,22 +236,10 @@ impl NMessageBehaviour { impl NMessageBehaviour { pub fn do_protocol_listener(&mut self, peer: PeerId, protocol: impl FnOnce(NegotiatedSubstream) -> F + Send + 'static ) where F: Future> + Send + 'static { self.protocol_in_events.push_back((peer, ProtocolInEvent::ExecuteInbound(Box::new(move |substream| protocol(substream).boxed())))); - - log::info!("pushing ExecuteInbound event"); - - if let Some(waker) = self.waker.take() { - log::trace!("waking task"); - - waker.wake(); - } } pub fn do_protocol_dialer(&mut self, peer: PeerId, protocol: impl FnOnce(NegotiatedSubstream) -> F + Send + 'static ) where F: Future> + Send + 'static { self.protocol_in_events.push_back((peer, ProtocolInEvent::ExecuteOutbound(Box::new(move |substream| protocol(substream).boxed())))); - - if let Some(waker) = self.waker.take() { - waker.wake(); - } } } @@ -281,32 +259,38 @@ impl NetworkBehaviour for NMessageBehaviour where I: Send + 's NMessageHandler::new(self.info) } - fn addresses_of_peer(&mut self, _: &PeerId) -> Vec { - Vec::new() + fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec { + self.connected_peers.get(peer).cloned().unwrap_or_default() } - fn inject_connected(&mut self, peer: &PeerId) { - self.connected_peers.push(peer.clone()); + fn inject_connected(&mut self, _: &PeerId) { } - fn inject_disconnected(&mut self, peer: &PeerId) { - self.connected_peers.retain(|p| p != peer) + fn inject_disconnected(&mut self, _: &PeerId) { + } + + fn inject_connection_established(&mut self, peer: &PeerId, _: &ConnectionId, point: &ConnectedPoint) { + let multiaddr = point.get_remote_address().clone(); + + self.connected_peers.entry(*peer).or_default().push(multiaddr); + } + + fn inject_connection_closed(&mut self, peer: &PeerId, _: &ConnectionId, point: &ConnectedPoint) { + let multiaddr = point.get_remote_address(); + + self.connected_peers.entry(*peer).or_default().retain(|addr| addr != multiaddr); } fn inject_event(&mut self, peer: PeerId, _: ConnectionId, event: ProtocolOutEvent) { self.protocol_out_events.push_back((peer, event)); } - fn poll(&mut self, cx: &mut Context<'_>, params: &mut impl PollParameters) -> Poll, Self::OutEvent>> { - log::debug!("peer {}, no. events {}", params.local_peer_id(), self.protocol_in_events.len()); - + fn poll(&mut self, _: &mut Context<'_>, _: &mut impl PollParameters) -> Poll, Self::OutEvent>> { if let Some((peer, event)) = self.protocol_in_events.pop_front() { - log::debug!("notifying handler"); - - if !self.connected_peers.contains(&peer) { - log::info!("not connected to peer {}, waiting ...", peer); + if !self.connected_peers.contains_key(&peer) { self.protocol_in_events.push_back((peer, event)); } else { + return Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id: peer, handler: NotifyHandler::Any, event}) } } @@ -320,8 +304,6 @@ impl NetworkBehaviour for NMessageBehaviour where I: Send + 's })) } - self.waker = Some(cx.waker().clone()); - Poll::Pending } } @@ -333,7 +315,8 @@ mod tests { use anyhow::{Context, Error}; use swarm_harness::new_connected_swarm_pair; use libp2p::swarm::SwarmEvent; - use libp2p::futures::future::join; + use crate::swarm_harness::await_events_or_timeout; + use tokio::runtime::Handle; #[derive(serde::Serialize, serde::Deserialize)] #[derive(Debug)] @@ -395,27 +378,17 @@ mod tests { impl MyBehaviour { fn alice_do_protocol(&mut self, bob: PeerId, foo: u32, baz: u32) { self.inner.do_protocol_dialer(bob, move |mut substream| async move { - log::trace!("alice starting protocol"); - - upgrade::write_one(&mut substream, serde_cbor::to_vec(&Message0 { + upgrade::write_with_len_prefix(&mut substream, serde_cbor::to_vec(&Message0 { foo }).context("failed to serialize Message0")?).await?; - log::trace!("alice sent message0"); - let bytes = upgrade::read_one(&mut substream, 1024).await?; let message1 = serde_cbor::from_slice::(&bytes)?; - log::trace!("alice read message1"); - - upgrade::write_one(&mut substream, serde_cbor::to_vec(&Message2 { + upgrade::write_with_len_prefix(&mut substream, serde_cbor::to_vec(&Message2 { baz }).context("failed to serialize Message2")?).await?; - log::trace!("alice sent message2"); - - log::trace!("alice finished"); - Ok(AliceResult { bar: message1.bar }) @@ -424,26 +397,16 @@ mod tests { fn bob_do_protocol(&mut self, alice: PeerId, bar: u32) { self.inner.do_protocol_listener(alice, move |mut substream| async move { - log::trace!("bob start protocol"); - let bytes = upgrade::read_one(&mut substream, 1024).await?; let message0 = serde_cbor::from_slice::(&bytes)?; - log::trace!("bob read message0"); - - upgrade::write_one(&mut substream, serde_cbor::to_vec(&Message1 { + upgrade::write_with_len_prefix(&mut substream, serde_cbor::to_vec(&Message1 { bar }).context("failed to serialize Message1")?).await?; - log::trace!("bob sent message1"); - let bytes = upgrade::read_one(&mut substream, 1024).await?; let message2 = serde_cbor::from_slice::(&bytes)?; - log::trace!("bob read message2"); - - log::trace!("bob finished"); - Ok(BobResult { foo: message0.foo, baz: message2.baz @@ -456,23 +419,17 @@ mod tests { async fn it_works() { let _ = env_logger::try_init(); - let (mut alice, mut bob) = new_connected_swarm_pair(|_, _| MyBehaviour::new()).await; - - log::info!("alice = {}", alice.peer_id); - log::info!("bob = {}", bob.peer_id); + let (mut alice, mut bob) = new_connected_swarm_pair(|_, _| MyBehaviour::new(), Handle::current()).await; alice.swarm.alice_do_protocol(bob.peer_id, 10, 42); bob.swarm.bob_do_protocol(alice.peer_id, 1337); - let alice_handle = tokio::spawn(async move { alice.swarm.next_event().await }); - let bob_handle = tokio::spawn(async move { bob.swarm.next_event().await }); + let (alice_event, bob_event) = await_events_or_timeout(alice.swarm.next_event(), bob.swarm.next_event()).await; - let (alice_event, bob_event) = join(alice_handle, bob_handle).await; - - assert!(matches!(dbg!(alice_event.unwrap()), SwarmEvent::Behaviour(MyOutEvent::Alice(AliceResult { + assert!(matches!(alice_event, SwarmEvent::Behaviour(MyOutEvent::Alice(AliceResult { bar: 1337 })))); - assert!(matches!(dbg!(bob_event.unwrap()), SwarmEvent::Behaviour(MyOutEvent::Bob(BobResult { + assert!(matches!(bob_event, SwarmEvent::Behaviour(MyOutEvent::Bob(BobResult { foo: 10, baz: 42 })))); diff --git a/src/swarm_harness.rs b/src/swarm_harness.rs index ca2edd01..15bab452 100644 --- a/src/swarm_harness.rs +++ b/src/swarm_harness.rs @@ -1,7 +1,7 @@ use libp2p::futures::future; use libp2p::{ core::{ - muxing::StreamMuxerBox, transport::memory::MemoryTransport, upgrade::Version, Executor, + muxing::StreamMuxerBox, transport::memory::MemoryTransport, upgrade::Version, }, identity, noise::{self, NoiseConfig, X25519Spec}, @@ -9,18 +9,10 @@ use libp2p::{ yamux::YamuxConfig, Multiaddr, PeerId, Swarm, Transport, }; -use std::{fmt::Debug, future::Future, pin::Pin, time::Duration}; +use std::{fmt::Debug, future::Future, time::Duration}; use tokio::time; - -/// An adaptor struct for libp2p that spawns futures into the current -/// thread-local runtime. -struct GlobalSpawnTokioExecutor; - -impl Executor for GlobalSpawnTokioExecutor { - fn exec(&self, future: Pin + Send>>) { - let _ = tokio::spawn(future); - } -} +use tokio::runtime::Handle; +use libp2p::futures::future::FutureExt; #[allow(missing_debug_implementations)] pub struct Actor { @@ -29,19 +21,19 @@ pub struct Actor { pub peer_id: PeerId, } -pub async fn new_connected_swarm_pair(behaviour_fn: F) -> (Actor, Actor) +pub async fn new_connected_swarm_pair(behaviour_fn: F, handle: Handle) -> (Actor, Actor) where B: NetworkBehaviour, F: Fn(PeerId, identity::Keypair) -> B + Clone, ::OutEvent: Debug{ - let (swarm, addr, peer_id) = new_swarm(behaviour_fn.clone()); + let (swarm, addr, peer_id) = new_swarm(behaviour_fn.clone(), handle.clone()); let mut alice = Actor { swarm, addr, peer_id, }; - let (swarm, addr, peer_id) = new_swarm(behaviour_fn); + let (swarm, addr, peer_id) = new_swarm(behaviour_fn, handle); let mut bob = Actor { swarm, addr, @@ -53,7 +45,7 @@ where (alice, bob) } -pub fn new_swarm B>(behaviour_fn: F) -> (Swarm, Multiaddr, PeerId) { +pub fn new_swarm B>(behaviour_fn: F, handle: Handle) -> (Swarm, Multiaddr, PeerId) { let id_keys = identity::Keypair::generate_ed25519(); let peer_id = PeerId::from(id_keys.public()); @@ -74,7 +66,9 @@ pub fn new_swarm B>(beh behaviour_fn(peer_id.clone(), id_keys), peer_id.clone(), ) - .executor(Box::new(GlobalSpawnTokioExecutor)) + .executor(Box::new(move |f| { + handle.spawn(f); + })) .build(); let address_port = rand::random::(); @@ -116,35 +110,42 @@ pub async fn connect(alice: &mut Swarm, bob: &mut Swarm) let mut alice_connected = false; let mut bob_connected = false; - while !alice_connected && !bob_connected { - let (alice_event, bob_event) = future::join(alice.next_event(), bob.next_event()).await; + while !(alice_connected && bob_connected) { + libp2p::futures::select! { + alice_event = alice.next_event().fuse() => { + match alice_event { + SwarmEvent::ConnectionEstablished { .. } => { + log::info!("alice connected"); + alice_connected = true; + } + SwarmEvent::Behaviour(event) => { + panic!( + "alice unexpectedly emitted a behaviour event during connection: {:?}", + event + ); + } + _ => {} + } + } - match alice_event { - SwarmEvent::ConnectionEstablished { .. } => { - alice_connected = true; + bob_event = bob.next_event().fuse() => { + match bob_event { + SwarmEvent::ConnectionEstablished { .. } => { + log::info!("bob connected"); + bob_connected = true; + } + SwarmEvent::NewListenAddr(addr) => { + Swarm::dial_addr(alice, addr).unwrap(); + } + SwarmEvent::Behaviour(event) => { + panic!( + "bob unexpectedly emitted a behaviour event during connection: {:?}", + event + ); + } + _ => {} + } } - SwarmEvent::Behaviour(event) => { - panic!( - "alice unexpectedly emitted a behaviour event during connection: {:?}", - event - ); - } - _ => {} - } - match bob_event { - SwarmEvent::ConnectionEstablished { .. } => { - bob_connected = true; - } - SwarmEvent::NewListenAddr(addr) => { - Swarm::dial_addr(alice, addr).unwrap(); - } - SwarmEvent::Behaviour(event) => { - panic!( - "bob unexpectedly emitted a behaviour event during connection: {:?}", - event - ); - } - _ => {} } } }