Initial working version

This commit is contained in:
Thomas Eizinger 2021-01-18 15:24:09 +11:00
parent 6877ae4d6f
commit d5d7389d25
No known key found for this signature in database
GPG key ID: 651AC83A6C6C8B96
3 changed files with 92 additions and 134 deletions

View file

@ -13,7 +13,7 @@ log = "0.4"
[dev-dependencies] [dev-dependencies]
anyhow = "1" anyhow = "1"
serde_cbor = "0.11" serde_cbor = "0.11"
tokio = { version = "1", features = ["macros", "rt", "time"] } tokio = { version = "1", features = ["macros", "rt", "time", "rt-multi-thread"] }
libp2p = { version = "0.34", default-features = false, features = ["noise", "yamux"] } libp2p = { version = "0.34", default-features = false, features = ["noise", "yamux"] }
rand = "0.8" rand = "0.8"
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }

View file

@ -1,16 +1,15 @@
use libp2p::swarm::{ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, KeepAlive, SubstreamProtocol, NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction, PollParameters, NotifyHandler}; use libp2p::swarm::{ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, KeepAlive, SubstreamProtocol, NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction, PollParameters, NotifyHandler};
use libp2p::futures::task::{Context, Poll}; use libp2p::futures::task::{Context, Poll};
use libp2p::{InboundUpgrade, PeerId, OutboundUpgrade}; use libp2p::{InboundUpgrade, PeerId, OutboundUpgrade};
use libp2p::core::{UpgradeInfo, Multiaddr}; use libp2p::core::{UpgradeInfo, Multiaddr, ConnectedPoint};
use libp2p::futures::{FutureExt}; use libp2p::futures::{FutureExt};
use std::future::{Ready, Future}; use std::future::{Ready, Future};
use std::convert::Infallible; use std::convert::Infallible;
use libp2p::futures::future::BoxFuture; use libp2p::futures::future::BoxFuture;
use std::collections::VecDeque; use std::collections::{VecDeque, HashMap};
use libp2p::swarm::protocols_handler::OutboundUpgradeSend; use libp2p::swarm::protocols_handler::OutboundUpgradeSend;
use libp2p::core::connection::ConnectionId; use libp2p::core::connection::{ConnectionId};
use std::iter; use std::iter;
use std::task::Waker;
#[cfg(test)] #[cfg(test)]
mod swarm_harness; mod swarm_harness;
@ -113,8 +112,6 @@ impl<TInboundOut, TOutboundOut, TErr> ProtocolsHandler for NMessageHandler<TInbo
} }
fn inject_fully_negotiated_inbound(&mut self, protocol: NegotiatedSubstream, _: Self::InboundOpenInfo) { fn inject_fully_negotiated_inbound(&mut self, protocol: NegotiatedSubstream, _: Self::InboundOpenInfo) {
log::info!("inject_fully_negotiated_inbound");
if let Some(future_fn) = self.inbound_future_fn.take() { if let Some(future_fn) = self.inbound_future_fn.take() {
self.inbound_future = Some(future_fn(protocol)) self.inbound_future = Some(future_fn(protocol))
} else { } else {
@ -123,8 +120,6 @@ impl<TInboundOut, TOutboundOut, TErr> ProtocolsHandler for NMessageHandler<TInbo
} }
fn inject_fully_negotiated_outbound(&mut self, protocol: NegotiatedSubstream, _: Self::OutboundOpenInfo) { fn inject_fully_negotiated_outbound(&mut self, protocol: NegotiatedSubstream, _: Self::OutboundOpenInfo) {
log::info!("inject_fully_negotiated_outbound");
if let Some(future_fn) = self.outbound_future_fn.take() { if let Some(future_fn) = self.outbound_future_fn.take() {
self.outbound_future = Some(future_fn(protocol)) self.outbound_future = Some(future_fn(protocol))
} else { } else {
@ -135,12 +130,8 @@ impl<TInboundOut, TOutboundOut, TErr> ProtocolsHandler for NMessageHandler<TInbo
fn inject_event(&mut self, event: Self::InEvent) { fn inject_event(&mut self, event: Self::InEvent) {
match event { match event {
ProtocolInEvent::ExecuteInbound(protocol_fn) => { ProtocolInEvent::ExecuteInbound(protocol_fn) => {
log::trace!("got execute inbound event");
match self.inbound_substream.take() { match self.inbound_substream.take() {
Some(substream) => { Some(substream) => {
log::trace!("got inbound substream, upgrading with custom protocol");
self.inbound_future = Some(protocol_fn(substream)) self.inbound_future = Some(protocol_fn(substream))
} }
None => { None => {
@ -149,14 +140,10 @@ impl<TInboundOut, TOutboundOut, TErr> ProtocolsHandler for NMessageHandler<TInbo
} }
} }
ProtocolInEvent::ExecuteOutbound(protocol_fn) => { ProtocolInEvent::ExecuteOutbound(protocol_fn) => {
log::trace!("got execute outbound event");
self.substream_request = Some(SubstreamProtocol::new(NMessageProtocol::new(self.info), ())); self.substream_request = Some(SubstreamProtocol::new(NMessageProtocol::new(self.info), ()));
match self.outbound_substream.take() { match self.outbound_substream.take() {
Some(substream) => { Some(substream) => {
log::trace!("got outbound substream, upgrading with custom protocol");
self.outbound_future = Some(protocol_fn(substream)); self.outbound_future = Some(protocol_fn(substream));
} }
None => { None => {
@ -183,7 +170,7 @@ impl<TInboundOut, TOutboundOut, TErr> ProtocolsHandler for NMessageHandler<TInbo
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol }) return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol })
} }
if let Some(future) = self.inbound_future.as_mut() { if let Some(mut future) = self.inbound_future.take() {
match future.poll_unpin(cx) { match future.poll_unpin(cx) {
Poll::Ready(Ok(value)) => { Poll::Ready(Ok(value)) => {
return Poll::Ready(ProtocolsHandlerEvent::Custom(ProtocolOutEvent::InboundFinished(value))) return Poll::Ready(ProtocolsHandlerEvent::Custom(ProtocolOutEvent::InboundFinished(value)))
@ -191,11 +178,14 @@ impl<TInboundOut, TOutboundOut, TErr> ProtocolsHandler for NMessageHandler<TInbo
Poll::Ready(Err(e)) => { Poll::Ready(Err(e)) => {
return Poll::Ready(ProtocolsHandlerEvent::Custom(ProtocolOutEvent::InboundFailed(e))) return Poll::Ready(ProtocolsHandlerEvent::Custom(ProtocolOutEvent::InboundFailed(e)))
} }
Poll::Pending => {} Poll::Pending => {
self.inbound_future = Some(future);
return Poll::Pending
}
} }
} }
if let Some(future) = self.outbound_future.as_mut() { if let Some(mut future) = self.outbound_future.take() {
match future.poll_unpin(cx) { match future.poll_unpin(cx) {
Poll::Ready(Ok(value)) => { Poll::Ready(Ok(value)) => {
return Poll::Ready(ProtocolsHandlerEvent::Custom(ProtocolOutEvent::OutboundFinished(value))) return Poll::Ready(ProtocolsHandlerEvent::Custom(ProtocolOutEvent::OutboundFinished(value)))
@ -203,7 +193,10 @@ impl<TInboundOut, TOutboundOut, TErr> ProtocolsHandler for NMessageHandler<TInbo
Poll::Ready(Err(e)) => { Poll::Ready(Err(e)) => {
return Poll::Ready(ProtocolsHandlerEvent::Custom(ProtocolOutEvent::OutboundFailed(e))) return Poll::Ready(ProtocolsHandlerEvent::Custom(ProtocolOutEvent::OutboundFailed(e)))
} }
Poll::Pending => {} Poll::Pending => {
self.outbound_future = Some(future);
return Poll::Pending
}
} }
} }
@ -215,11 +208,9 @@ pub struct NMessageBehaviour<I, O, E> {
protocol_in_events: VecDeque<(PeerId, ProtocolInEvent<I, O, E>)>, protocol_in_events: VecDeque<(PeerId, ProtocolInEvent<I, O, E>)>,
protocol_out_events: VecDeque<(PeerId, ProtocolOutEvent<I, O, E>)>, protocol_out_events: VecDeque<(PeerId, ProtocolOutEvent<I, O, E>)>,
waker: Option<Waker>, connected_peers: HashMap<PeerId, Vec<Multiaddr>>,
connected_peers: Vec<PeerId>, info: &'static [u8],
info: &'static [u8]
} }
impl<I, O, E> NMessageBehaviour<I, O, E> { impl<I, O, E> NMessageBehaviour<I, O, E> {
@ -234,10 +225,9 @@ impl<I, O, E> NMessageBehaviour<I, O, E> {
/// ``` /// ```
pub fn new(info: &'static [u8]) -> Self { pub fn new(info: &'static [u8]) -> Self {
Self { Self {
protocol_in_events: Default::default(), protocol_in_events: VecDeque::default(),
protocol_out_events: Default::default(), protocol_out_events: VecDeque::default(),
waker: None, connected_peers: HashMap::default(),
connected_peers: vec![],
info info
} }
} }
@ -246,22 +236,10 @@ impl<I, O, E> NMessageBehaviour<I, O, E> {
impl<I, O, E> NMessageBehaviour<I, O, E> { impl<I, O, E> NMessageBehaviour<I, O, E> {
pub fn do_protocol_listener<F>(&mut self, peer: PeerId, protocol: impl FnOnce(NegotiatedSubstream) -> F + Send + 'static ) where F: Future<Output = Result<I, E>> + Send + 'static { pub fn do_protocol_listener<F>(&mut self, peer: PeerId, protocol: impl FnOnce(NegotiatedSubstream) -> F + Send + 'static ) where F: Future<Output = Result<I, E>> + Send + 'static {
self.protocol_in_events.push_back((peer, ProtocolInEvent::ExecuteInbound(Box::new(move |substream| protocol(substream).boxed())))); self.protocol_in_events.push_back((peer, ProtocolInEvent::ExecuteInbound(Box::new(move |substream| protocol(substream).boxed()))));
log::info!("pushing ExecuteInbound event");
if let Some(waker) = self.waker.take() {
log::trace!("waking task");
waker.wake();
}
} }
pub fn do_protocol_dialer<F>(&mut self, peer: PeerId, protocol: impl FnOnce(NegotiatedSubstream) -> F + Send + 'static ) where F: Future<Output = Result<O, E>> + Send + 'static { pub fn do_protocol_dialer<F>(&mut self, peer: PeerId, protocol: impl FnOnce(NegotiatedSubstream) -> F + Send + 'static ) where F: Future<Output = Result<O, E>> + Send + 'static {
self.protocol_in_events.push_back((peer, ProtocolInEvent::ExecuteOutbound(Box::new(move |substream| protocol(substream).boxed())))); self.protocol_in_events.push_back((peer, ProtocolInEvent::ExecuteOutbound(Box::new(move |substream| protocol(substream).boxed()))));
if let Some(waker) = self.waker.take() {
waker.wake();
}
} }
} }
@ -281,32 +259,38 @@ impl<I, O, E> NetworkBehaviour for NMessageBehaviour<I, O, E> where I: Send + 's
NMessageHandler::new(self.info) NMessageHandler::new(self.info)
} }
fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> { fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec<Multiaddr> {
Vec::new() self.connected_peers.get(peer).cloned().unwrap_or_default()
} }
fn inject_connected(&mut self, peer: &PeerId) { fn inject_connected(&mut self, _: &PeerId) {
self.connected_peers.push(peer.clone());
} }
fn inject_disconnected(&mut self, peer: &PeerId) { fn inject_disconnected(&mut self, _: &PeerId) {
self.connected_peers.retain(|p| p != peer) }
fn inject_connection_established(&mut self, peer: &PeerId, _: &ConnectionId, point: &ConnectedPoint) {
let multiaddr = point.get_remote_address().clone();
self.connected_peers.entry(*peer).or_default().push(multiaddr);
}
fn inject_connection_closed(&mut self, peer: &PeerId, _: &ConnectionId, point: &ConnectedPoint) {
let multiaddr = point.get_remote_address();
self.connected_peers.entry(*peer).or_default().retain(|addr| addr != multiaddr);
} }
fn inject_event(&mut self, peer: PeerId, _: ConnectionId, event: ProtocolOutEvent<I, O, E>) { fn inject_event(&mut self, peer: PeerId, _: ConnectionId, event: ProtocolOutEvent<I, O, E>) {
self.protocol_out_events.push_back((peer, event)); self.protocol_out_events.push_back((peer, event));
} }
fn poll(&mut self, cx: &mut Context<'_>, params: &mut impl PollParameters) -> Poll<NetworkBehaviourAction<ProtocolInEvent<I, O, E>, Self::OutEvent>> { fn poll(&mut self, _: &mut Context<'_>, _: &mut impl PollParameters) -> Poll<NetworkBehaviourAction<ProtocolInEvent<I, O, E>, Self::OutEvent>> {
log::debug!("peer {}, no. events {}", params.local_peer_id(), self.protocol_in_events.len());
if let Some((peer, event)) = self.protocol_in_events.pop_front() { if let Some((peer, event)) = self.protocol_in_events.pop_front() {
log::debug!("notifying handler"); if !self.connected_peers.contains_key(&peer) {
if !self.connected_peers.contains(&peer) {
log::info!("not connected to peer {}, waiting ...", peer);
self.protocol_in_events.push_back((peer, event)); self.protocol_in_events.push_back((peer, event));
} else { } else {
return Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id: peer, handler: NotifyHandler::Any, event}) return Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id: peer, handler: NotifyHandler::Any, event})
} }
} }
@ -320,8 +304,6 @@ impl<I, O, E> NetworkBehaviour for NMessageBehaviour<I, O, E> where I: Send + 's
})) }))
} }
self.waker = Some(cx.waker().clone());
Poll::Pending Poll::Pending
} }
} }
@ -333,7 +315,8 @@ mod tests {
use anyhow::{Context, Error}; use anyhow::{Context, Error};
use swarm_harness::new_connected_swarm_pair; use swarm_harness::new_connected_swarm_pair;
use libp2p::swarm::SwarmEvent; use libp2p::swarm::SwarmEvent;
use libp2p::futures::future::join; use crate::swarm_harness::await_events_or_timeout;
use tokio::runtime::Handle;
#[derive(serde::Serialize, serde::Deserialize)] #[derive(serde::Serialize, serde::Deserialize)]
#[derive(Debug)] #[derive(Debug)]
@ -395,27 +378,17 @@ mod tests {
impl MyBehaviour { impl MyBehaviour {
fn alice_do_protocol(&mut self, bob: PeerId, foo: u32, baz: u32) { fn alice_do_protocol(&mut self, bob: PeerId, foo: u32, baz: u32) {
self.inner.do_protocol_dialer(bob, move |mut substream| async move { self.inner.do_protocol_dialer(bob, move |mut substream| async move {
log::trace!("alice starting protocol"); upgrade::write_with_len_prefix(&mut substream, serde_cbor::to_vec(&Message0 {
upgrade::write_one(&mut substream, serde_cbor::to_vec(&Message0 {
foo foo
}).context("failed to serialize Message0")?).await?; }).context("failed to serialize Message0")?).await?;
log::trace!("alice sent message0");
let bytes = upgrade::read_one(&mut substream, 1024).await?; let bytes = upgrade::read_one(&mut substream, 1024).await?;
let message1 = serde_cbor::from_slice::<Message1>(&bytes)?; let message1 = serde_cbor::from_slice::<Message1>(&bytes)?;
log::trace!("alice read message1"); upgrade::write_with_len_prefix(&mut substream, serde_cbor::to_vec(&Message2 {
upgrade::write_one(&mut substream, serde_cbor::to_vec(&Message2 {
baz baz
}).context("failed to serialize Message2")?).await?; }).context("failed to serialize Message2")?).await?;
log::trace!("alice sent message2");
log::trace!("alice finished");
Ok(AliceResult { Ok(AliceResult {
bar: message1.bar bar: message1.bar
}) })
@ -424,26 +397,16 @@ mod tests {
fn bob_do_protocol(&mut self, alice: PeerId, bar: u32) { fn bob_do_protocol(&mut self, alice: PeerId, bar: u32) {
self.inner.do_protocol_listener(alice, move |mut substream| async move { self.inner.do_protocol_listener(alice, move |mut substream| async move {
log::trace!("bob start protocol");
let bytes = upgrade::read_one(&mut substream, 1024).await?; let bytes = upgrade::read_one(&mut substream, 1024).await?;
let message0 = serde_cbor::from_slice::<Message0>(&bytes)?; let message0 = serde_cbor::from_slice::<Message0>(&bytes)?;
log::trace!("bob read message0"); upgrade::write_with_len_prefix(&mut substream, serde_cbor::to_vec(&Message1 {
upgrade::write_one(&mut substream, serde_cbor::to_vec(&Message1 {
bar bar
}).context("failed to serialize Message1")?).await?; }).context("failed to serialize Message1")?).await?;
log::trace!("bob sent message1");
let bytes = upgrade::read_one(&mut substream, 1024).await?; let bytes = upgrade::read_one(&mut substream, 1024).await?;
let message2 = serde_cbor::from_slice::<Message2>(&bytes)?; let message2 = serde_cbor::from_slice::<Message2>(&bytes)?;
log::trace!("bob read message2");
log::trace!("bob finished");
Ok(BobResult { Ok(BobResult {
foo: message0.foo, foo: message0.foo,
baz: message2.baz baz: message2.baz
@ -456,23 +419,17 @@ mod tests {
async fn it_works() { async fn it_works() {
let _ = env_logger::try_init(); let _ = env_logger::try_init();
let (mut alice, mut bob) = new_connected_swarm_pair(|_, _| MyBehaviour::new()).await; let (mut alice, mut bob) = new_connected_swarm_pair(|_, _| MyBehaviour::new(), Handle::current()).await;
log::info!("alice = {}", alice.peer_id);
log::info!("bob = {}", bob.peer_id);
alice.swarm.alice_do_protocol(bob.peer_id, 10, 42); alice.swarm.alice_do_protocol(bob.peer_id, 10, 42);
bob.swarm.bob_do_protocol(alice.peer_id, 1337); bob.swarm.bob_do_protocol(alice.peer_id, 1337);
let alice_handle = tokio::spawn(async move { alice.swarm.next_event().await }); let (alice_event, bob_event) = await_events_or_timeout(alice.swarm.next_event(), bob.swarm.next_event()).await;
let bob_handle = tokio::spawn(async move { bob.swarm.next_event().await });
let (alice_event, bob_event) = join(alice_handle, bob_handle).await; assert!(matches!(alice_event, SwarmEvent::Behaviour(MyOutEvent::Alice(AliceResult {
assert!(matches!(dbg!(alice_event.unwrap()), SwarmEvent::Behaviour(MyOutEvent::Alice(AliceResult {
bar: 1337 bar: 1337
})))); }))));
assert!(matches!(dbg!(bob_event.unwrap()), SwarmEvent::Behaviour(MyOutEvent::Bob(BobResult { assert!(matches!(bob_event, SwarmEvent::Behaviour(MyOutEvent::Bob(BobResult {
foo: 10, foo: 10,
baz: 42 baz: 42
})))); }))));

View file

@ -1,7 +1,7 @@
use libp2p::futures::future; use libp2p::futures::future;
use libp2p::{ use libp2p::{
core::{ core::{
muxing::StreamMuxerBox, transport::memory::MemoryTransport, upgrade::Version, Executor, muxing::StreamMuxerBox, transport::memory::MemoryTransport, upgrade::Version,
}, },
identity, identity,
noise::{self, NoiseConfig, X25519Spec}, noise::{self, NoiseConfig, X25519Spec},
@ -9,18 +9,10 @@ use libp2p::{
yamux::YamuxConfig, yamux::YamuxConfig,
Multiaddr, PeerId, Swarm, Transport, Multiaddr, PeerId, Swarm, Transport,
}; };
use std::{fmt::Debug, future::Future, pin::Pin, time::Duration}; use std::{fmt::Debug, future::Future, time::Duration};
use tokio::time; use tokio::time;
use tokio::runtime::Handle;
/// An adaptor struct for libp2p that spawns futures into the current use libp2p::futures::future::FutureExt;
/// thread-local runtime.
struct GlobalSpawnTokioExecutor;
impl Executor for GlobalSpawnTokioExecutor {
fn exec(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
let _ = tokio::spawn(future);
}
}
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct Actor<B: NetworkBehaviour> { pub struct Actor<B: NetworkBehaviour> {
@ -29,19 +21,19 @@ pub struct Actor<B: NetworkBehaviour> {
pub peer_id: PeerId, pub peer_id: PeerId,
} }
pub async fn new_connected_swarm_pair<B, F>(behaviour_fn: F) -> (Actor<B>, Actor<B>) pub async fn new_connected_swarm_pair<B, F>(behaviour_fn: F, handle: Handle) -> (Actor<B>, Actor<B>)
where where
B: NetworkBehaviour, B: NetworkBehaviour,
F: Fn(PeerId, identity::Keypair) -> B + Clone, F: Fn(PeerId, identity::Keypair) -> B + Clone,
<B as NetworkBehaviour>::OutEvent: Debug{ <B as NetworkBehaviour>::OutEvent: Debug{
let (swarm, addr, peer_id) = new_swarm(behaviour_fn.clone()); let (swarm, addr, peer_id) = new_swarm(behaviour_fn.clone(), handle.clone());
let mut alice = Actor { let mut alice = Actor {
swarm, swarm,
addr, addr,
peer_id, peer_id,
}; };
let (swarm, addr, peer_id) = new_swarm(behaviour_fn); let (swarm, addr, peer_id) = new_swarm(behaviour_fn, handle);
let mut bob = Actor { let mut bob = Actor {
swarm, swarm,
addr, addr,
@ -53,7 +45,7 @@ where
(alice, bob) (alice, bob)
} }
pub fn new_swarm<B: NetworkBehaviour, F: Fn(PeerId, identity::Keypair) -> B>(behaviour_fn: F) -> (Swarm<B>, Multiaddr, PeerId) { pub fn new_swarm<B: NetworkBehaviour, F: Fn(PeerId, identity::Keypair) -> B>(behaviour_fn: F, handle: Handle) -> (Swarm<B>, Multiaddr, PeerId) {
let id_keys = identity::Keypair::generate_ed25519(); let id_keys = identity::Keypair::generate_ed25519();
let peer_id = PeerId::from(id_keys.public()); let peer_id = PeerId::from(id_keys.public());
@ -74,7 +66,9 @@ pub fn new_swarm<B: NetworkBehaviour, F: Fn(PeerId, identity::Keypair) -> B>(beh
behaviour_fn(peer_id.clone(), id_keys), behaviour_fn(peer_id.clone(), id_keys),
peer_id.clone(), peer_id.clone(),
) )
.executor(Box::new(GlobalSpawnTokioExecutor)) .executor(Box::new(move |f| {
handle.spawn(f);
}))
.build(); .build();
let address_port = rand::random::<u64>(); let address_port = rand::random::<u64>();
@ -116,35 +110,42 @@ pub async fn connect<B>(alice: &mut Swarm<B>, bob: &mut Swarm<B>)
let mut alice_connected = false; let mut alice_connected = false;
let mut bob_connected = false; let mut bob_connected = false;
while !alice_connected && !bob_connected { while !(alice_connected && bob_connected) {
let (alice_event, bob_event) = future::join(alice.next_event(), bob.next_event()).await; libp2p::futures::select! {
alice_event = alice.next_event().fuse() => {
match alice_event {
SwarmEvent::ConnectionEstablished { .. } => {
log::info!("alice connected");
alice_connected = true;
}
SwarmEvent::Behaviour(event) => {
panic!(
"alice unexpectedly emitted a behaviour event during connection: {:?}",
event
);
}
_ => {}
}
}
match alice_event { bob_event = bob.next_event().fuse() => {
SwarmEvent::ConnectionEstablished { .. } => { match bob_event {
alice_connected = true; SwarmEvent::ConnectionEstablished { .. } => {
log::info!("bob connected");
bob_connected = true;
}
SwarmEvent::NewListenAddr(addr) => {
Swarm::dial_addr(alice, addr).unwrap();
}
SwarmEvent::Behaviour(event) => {
panic!(
"bob unexpectedly emitted a behaviour event during connection: {:?}",
event
);
}
_ => {}
}
} }
SwarmEvent::Behaviour(event) => {
panic!(
"alice unexpectedly emitted a behaviour event during connection: {:?}",
event
);
}
_ => {}
}
match bob_event {
SwarmEvent::ConnectionEstablished { .. } => {
bob_connected = true;
}
SwarmEvent::NewListenAddr(addr) => {
Swarm::dial_addr(alice, addr).unwrap();
}
SwarmEvent::Behaviour(event) => {
panic!(
"bob unexpectedly emitted a behaviour event during connection: {:?}",
event
);
}
_ => {}
} }
} }
} }