Use upstream libp2p instead of our fork

The rendezvous protocol has been merged into upstream. We no longer need
 to depend on our fork. The ASB no longer works as a rendezvous point.
This commit is contained in:
rishflab 2021-10-13 09:52:39 +11:00
parent b9d2e9c3ae
commit a42484f04d
15 changed files with 226 additions and 221 deletions

View file

@ -249,11 +249,11 @@ where
channel
}.boxed());
}
SwarmEvent::Behaviour(OutEvent::Rendezvous(libp2p::rendezvous::Event::Registered { .. })) => {
SwarmEvent::Behaviour(OutEvent::Rendezvous(libp2p::rendezvous::client::Event::Registered { .. })) => {
tracing::info!("Successfully registered with rendezvous node");
}
SwarmEvent::Behaviour(OutEvent::Rendezvous(libp2p::rendezvous::Event::RegisterFailed(error))) => {
tracing::error!("Registration with rendezvous node failed: {:#}", error);
SwarmEvent::Behaviour(OutEvent::Rendezvous(libp2p::rendezvous::client::Event::RegisterFailed(error))) => {
tracing::error!("Registration with rendezvous node failed: {:?}", error);
}
SwarmEvent::Behaviour(OutEvent::Failure {peer, error}) => {
tracing::error!(
@ -281,7 +281,7 @@ where
SwarmEvent::ConnectionClosed { peer_id: peer, num_established, endpoint, cause: None } if num_established == 0 => {
tracing::info!(%peer, address = %endpoint.get_remote_address(), "Successfully closed connection");
}
SwarmEvent::NewListenAddr(address) => {
SwarmEvent::NewListenAddr{address, ..} => {
tracing::info!(%address, "New listen address reported");
}
_ => {}

View file

@ -72,7 +72,7 @@ pub mod behaviour {
channel: ResponseChannel<()>,
peer: PeerId,
},
Rendezvous(libp2p::rendezvous::Event),
Rendezvous(libp2p::rendezvous::client::Event),
Failure {
peer: PeerId,
error: Error,
@ -163,8 +163,8 @@ pub mod behaviour {
}
}
impl From<libp2p::rendezvous::Event> for OutEvent {
fn from(event: libp2p::rendezvous::Event) -> Self {
impl From<libp2p::rendezvous::client::Event> for OutEvent {
fn from(event: libp2p::rendezvous::client::Event) -> Self {
OutEvent::Rendezvous(event)
}
}
@ -172,6 +172,7 @@ pub mod behaviour {
pub mod rendezous {
use super::*;
use libp2p::swarm::DialError;
use std::pin::Pin;
#[derive(PartialEq)]
@ -190,7 +191,7 @@ pub mod rendezous {
}
pub struct Behaviour {
inner: libp2p::rendezvous::Rendezvous,
inner: libp2p::rendezvous::client::Behaviour,
rendezvous_point: Multiaddr,
rendezvous_peer_id: PeerId,
namespace: XmrBtcNamespace,
@ -208,10 +209,7 @@ pub mod rendezous {
registration_ttl: Option<u64>,
) -> Self {
Self {
inner: libp2p::rendezvous::Rendezvous::new(
identity,
libp2p::rendezvous::Config::default(),
),
inner: libp2p::rendezvous::client::Behaviour::new(identity),
rendezvous_point: rendezvous_address,
rendezvous_peer_id,
namespace,
@ -232,8 +230,8 @@ pub mod rendezous {
impl NetworkBehaviour for Behaviour {
type ProtocolsHandler =
<libp2p::rendezvous::Rendezvous as NetworkBehaviour>::ProtocolsHandler;
type OutEvent = libp2p::rendezvous::Event;
<libp2p::rendezvous::client::Behaviour as NetworkBehaviour>::ProtocolsHandler;
type OutEvent = libp2p::rendezvous::client::Event;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
self.inner.new_handler()
@ -277,14 +275,23 @@ pub mod rendezous {
self.inner.inject_event(peer_id, connection, event)
}
fn inject_dial_failure(&mut self, peer_id: &PeerId) {
fn inject_dial_failure(
&mut self,
peer_id: &PeerId,
_handler: Self::ProtocolsHandler,
_error: DialError,
) {
if peer_id == &self.rendezvous_peer_id {
self.connection_status = ConnectionStatus::Disconnected;
}
}
#[allow(clippy::type_complexity)]
fn poll(&mut self, cx: &mut std::task::Context<'_>, params: &mut impl PollParameters) -> Poll<NetworkBehaviourAction<<<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent, Self::OutEvent>>{
fn poll(
&mut self,
cx: &mut std::task::Context<'_>,
params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
match &mut self.registration_status {
RegistrationStatus::RegisterOnNextConnection => match self.connection_status {
ConnectionStatus::Disconnected => {
@ -293,6 +300,7 @@ pub mod rendezous {
return Poll::Ready(NetworkBehaviourAction::DialPeer {
peer_id: self.rendezvous_peer_id,
condition: DialPeerCondition::Disconnected,
handler: Self::ProtocolsHandler::new(Duration::from_secs(30)),
});
}
ConnectionStatus::Dialling => {}
@ -315,6 +323,7 @@ pub mod rendezous {
return Poll::Ready(NetworkBehaviourAction::DialPeer {
peer_id: self.rendezvous_peer_id,
condition: DialPeerCondition::Disconnected,
handler: Self::ProtocolsHandler::new(Duration::from_secs(30)),
});
}
ConnectionStatus::Dialling => {}
@ -328,7 +337,7 @@ pub mod rendezous {
// reset the timer if we successfully registered
if let Poll::Ready(NetworkBehaviourAction::GenerateEvent(
libp2p::rendezvous::Event::Registered { ttl, .. },
libp2p::rendezvous::client::Event::Registered { ttl, .. },
)) = &inner_poll
{
let half_of_ttl = Duration::from_secs(*ttl) / 2;
@ -347,13 +356,14 @@ pub mod rendezous {
use super::*;
use crate::network::test::{new_swarm, SwarmExt};
use futures::StreamExt;
use libp2p::rendezvous;
use libp2p::swarm::SwarmEvent;
#[tokio::test]
async fn given_no_initial_connection_when_constructed_asb_connects_and_registers_with_rendezvous_node(
) {
let mut rendezvous_node = new_swarm(|_, identity| {
libp2p::rendezvous::Rendezvous::new(identity, libp2p::rendezvous::Config::default())
let mut rendezvous_node = new_swarm(|_, _| {
rendezvous::server::Behaviour::new(rendezvous::server::Config::default())
});
let rendezvous_address = rendezvous_node.listen_on_random_memory_address().await;
@ -375,7 +385,7 @@ pub mod rendezous {
});
let asb_registered = tokio::spawn(async move {
loop {
if let SwarmEvent::Behaviour(libp2p::rendezvous::Event::Registered { .. }) =
if let SwarmEvent::Behaviour(rendezvous::client::Event::Registered { .. }) =
asb.select_next_some().await
{
break;
@ -391,11 +401,9 @@ pub mod rendezous {
#[tokio::test]
async fn asb_automatically_re_registers() {
let min_ttl = 5;
let mut rendezvous_node = new_swarm(|_, identity| {
libp2p::rendezvous::Rendezvous::new(
identity,
libp2p::rendezvous::Config::default().with_min_ttl(min_ttl),
let mut rendezvous_node = new_swarm(|_, _| {
rendezvous::server::Behaviour::new(
rendezvous::server::Config::default().with_min_ttl(2),
)
});
let rendezvous_address = rendezvous_node.listen_on_random_memory_address().await;
@ -420,7 +428,7 @@ pub mod rendezous {
let mut number_of_registrations = 0;
loop {
if let SwarmEvent::Behaviour(libp2p::rendezvous::Event::Registered { .. }) =
if let SwarmEvent::Behaviour(rendezvous::client::Event::Registered { .. }) =
asb.select_next_some().await
{
number_of_registrations += 1

View file

@ -26,7 +26,7 @@ mod tests {
use libp2p::multiaddr::Protocol;
use libp2p::request_response::RequestResponseEvent;
use libp2p::swarm::{AddressScore, NetworkBehaviourEventProcess};
use libp2p::{identity, Multiaddr, PeerId};
use libp2p::{identity, rendezvous, Multiaddr, PeerId};
use std::collections::HashSet;
use std::iter::FromIterator;
use std::time::Duration;
@ -59,11 +59,8 @@ mod tests {
}
async fn setup_rendezvous_point() -> (Multiaddr, PeerId) {
let mut rendezvous_node = new_swarm(|_, identity| RendezvousPointBehaviour {
rendezvous: libp2p::rendezvous::Rendezvous::new(
identity,
libp2p::rendezvous::Config::default(),
),
let mut rendezvous_node = new_swarm(|_, _| RendezvousPointBehaviour {
rendezvous: rendezvous::server::Behaviour::new(rendezvous::server::Config::default()),
ping: Default::default(),
});
let rendezvous_address = rendezvous_node.listen_on_tcp_localhost().await;
@ -127,6 +124,7 @@ mod tests {
}
#[derive(libp2p::NetworkBehaviour)]
#[behaviour(event_process = true)]
struct StaticQuoteAsbBehaviour {
rendezvous: asb::rendezous::Behaviour,
// Support `Ping` as a workaround until https://github.com/libp2p/rust-libp2p/issues/2109 is fixed.
@ -138,14 +136,14 @@ mod tests {
#[behaviour(ignore)]
registered: bool,
}
impl NetworkBehaviourEventProcess<libp2p::rendezvous::Event> for StaticQuoteAsbBehaviour {
fn inject_event(&mut self, event: libp2p::rendezvous::Event) {
if let libp2p::rendezvous::Event::Registered { .. } = event {
impl NetworkBehaviourEventProcess<rendezvous::client::Event> for StaticQuoteAsbBehaviour {
fn inject_event(&mut self, event: rendezvous::client::Event) {
if let rendezvous::client::Event::Registered { .. } = event {
self.registered = true;
}
}
}
impl NetworkBehaviourEventProcess<libp2p::ping::PingEvent> for StaticQuoteAsbBehaviour {
fn inject_event(&mut self, _: libp2p::ping::PingEvent) {}
}
@ -164,14 +162,15 @@ mod tests {
}
#[derive(libp2p::NetworkBehaviour)]
#[behaviour(event_process = true)]
struct RendezvousPointBehaviour {
rendezvous: libp2p::rendezvous::Rendezvous,
rendezvous: rendezvous::server::Behaviour,
// Support `Ping` as a workaround until https://github.com/libp2p/rust-libp2p/issues/2109 is fixed.
ping: libp2p::ping::Ping,
}
impl NetworkBehaviourEventProcess<libp2p::rendezvous::Event> for RendezvousPointBehaviour {
fn inject_event(&mut self, _: libp2p::rendezvous::Event) {}
impl NetworkBehaviourEventProcess<rendezvous::server::Event> for RendezvousPointBehaviour {
fn inject_event(&mut self, _: rendezvous::server::Event) {}
}
impl NetworkBehaviourEventProcess<libp2p::ping::PingEvent> for RendezvousPointBehaviour {
fn inject_event(&mut self, _: libp2p::ping::PingEvent) {}

View file

@ -5,7 +5,6 @@ use anyhow::{Context, Result};
use futures::StreamExt;
use libp2p::multiaddr::Protocol;
use libp2p::ping::{Ping, PingConfig, PingEvent};
use libp2p::rendezvous::{Namespace, Rendezvous};
use libp2p::request_response::{RequestResponseEvent, RequestResponseMessage};
use libp2p::swarm::SwarmEvent;
use libp2p::{identity, rendezvous, Multiaddr, PeerId, Swarm};
@ -29,7 +28,7 @@ pub async fn list_sellers(
identity: identity::Keypair,
) -> Result<Vec<Seller>> {
let behaviour = Behaviour {
rendezvous: Rendezvous::new(identity.clone(), rendezvous::Config::default()),
rendezvous: rendezvous::client::Behaviour::new(identity.clone()),
quote: quote::cli(),
ping: Ping::new(
PingConfig::new()
@ -74,13 +73,13 @@ pub enum Status {
#[derive(Debug)]
enum OutEvent {
Rendezvous(rendezvous::Event),
Rendezvous(rendezvous::client::Event),
Quote(quote::OutEvent),
Ping(PingEvent),
}
impl From<rendezvous::Event> for OutEvent {
fn from(event: rendezvous::Event) -> Self {
impl From<rendezvous::client::Event> for OutEvent {
fn from(event: rendezvous::client::Event) -> Self {
OutEvent::Rendezvous(event)
}
}
@ -95,7 +94,7 @@ impl From<quote::OutEvent> for OutEvent {
#[behaviour(event_process = false)]
#[behaviour(out_event = "OutEvent")]
struct Behaviour {
rendezvous: Rendezvous,
rendezvous: rendezvous::client::Behaviour,
quote: quote::Behaviour,
ping: Ping,
}
@ -155,7 +154,7 @@ impl EventLoop {
);
self.swarm.behaviour_mut().rendezvous.discover(
Some(Namespace::new(self.namespace.to_string()).expect("our namespace to be a correct string")),
Some(rendezvous::Namespace::new(self.namespace.to_string()).expect("our namespace to be a correct string")),
None,
None,
self.rendezvous_peer_id,
@ -194,7 +193,7 @@ impl EventLoop {
}
}
SwarmEvent::Behaviour(OutEvent::Rendezvous(
rendezvous::Event::Discovered { registrations, .. },
libp2p::rendezvous::client::Event::Discovered { registrations, .. },
)) => {
self.state = State::WaitForQuoteCompletion;

View file

@ -1,7 +1,6 @@
use async_trait::async_trait;
use futures::prelude::*;
use libp2p::core::upgrade;
use libp2p::core::upgrade::ReadOneError;
use libp2p::request_response::{ProtocolName, RequestResponseCodec};
use serde::de::DeserializeOwned;
use serde::Serialize;
@ -40,10 +39,7 @@ where
where
T: AsyncRead + Unpin + Send,
{
let message = upgrade::read_one(io, BUF_SIZE).await.map_err(|e| match e {
ReadOneError::Io(err) => err,
e => io::Error::new(io::ErrorKind::Other, e),
})?;
let message = upgrade::read_length_prefixed(io, BUF_SIZE).await?;
let mut de = serde_cbor::Deserializer::from_slice(&message);
let msg = Req::deserialize(&mut de)
.map_err(|error| io::Error::new(io::ErrorKind::Other, error))?;
@ -59,9 +55,7 @@ where
where
T: AsyncRead + Unpin + Send,
{
let message = upgrade::read_one(io, BUF_SIZE)
.await
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
let message = upgrade::read_length_prefixed(io, BUF_SIZE).await?;
let mut de = serde_cbor::Deserializer::from_slice(&message);
let msg = Res::deserialize(&mut de)
.map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
@ -81,7 +75,7 @@ where
let bytes =
serde_cbor::to_vec(&req).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
upgrade::write_one(io, &bytes).await?;
upgrade::write_length_prefixed(io, &bytes).await?;
Ok(())
}
@ -97,7 +91,7 @@ where
{
let bytes = serde_cbor::to_vec(&res)
.map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
upgrade::write_one(io, &bytes).await?;
upgrade::write_length_prefixed(io, &bytes).await?;
Ok(())
}

View file

@ -55,7 +55,7 @@ where
where
T: AsyncRead + Unpin + Send,
{
let message = upgrade::read_one(io, BUF_SIZE)
let message = upgrade::read_length_prefixed(io, BUF_SIZE)
.await
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
let mut de = serde_json::Deserializer::from_slice(&message);
@ -88,7 +88,7 @@ where
{
let bytes = serde_json::to_vec(&res)
.map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
upgrade::write_one(io, &bytes).await?;
upgrade::write_length_prefixed(io, &bytes).await?;
Ok(())
}

View file

@ -93,7 +93,7 @@ impl NetworkBehaviour for Behaviour {
&mut self,
cx: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Void, Self::OutEvent>> {
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
let sleep = match self.sleep.as_mut() {
None => return Poll::Pending, // early exit if we shouldn't be re-dialling
Some(future) => future,
@ -115,6 +115,7 @@ impl NetworkBehaviour for Behaviour {
Poll::Ready(NetworkBehaviourAction::DialPeer {
peer_id: self.peer,
condition: DialPeerCondition::Disconnected,
handler: Self::ProtocolsHandler::default(),
})
}
}

View file

@ -90,7 +90,7 @@ pub async fn read_cbor_message<T>(substream: &mut NegotiatedSubstream) -> Result
where
T: DeserializeOwned,
{
let bytes = upgrade::read_one(substream, BUF_SIZE)
let bytes = upgrade::read_length_prefixed(substream, BUF_SIZE)
.await
.context("Failed to read length-prefixed message from stream")?;
let mut de = serde_cbor::Deserializer::from_slice(&bytes);
@ -106,7 +106,7 @@ where
{
let bytes =
serde_cbor::to_vec(&message).context("Failed to serialize message as bytes using CBOR")?;
upgrade::write_with_len_prefix(substream, &bytes)
upgrade::write_length_prefixed(substream, &bytes)
.await
.context("Failed to write bytes as length-prefixed message")?;

View file

@ -185,7 +185,7 @@ where
&mut self,
_cx: &mut std::task::Context<'_>,
_params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<(), Self::OutEvent>> {
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
@ -234,6 +234,7 @@ impl<LR> Handler<LR> {
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum HandlerOutEvent {
Initiated(bmrng::RequestReceiver<bitcoin::Amount, WalletSnapshot>),
Completed(Result<(Uuid, State3)>),

View file

@ -76,7 +76,7 @@ impl NetworkBehaviour for Behaviour {
&mut self,
_cx: &mut Context<'_>,
_params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<NewSwap, Self::OutEvent>> {
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
if let Some((_, event)) = self.completed_swaps.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
@ -126,6 +126,7 @@ pub struct NewSwap {
pub bitcoin_refund_address: bitcoin::Address,
}
#[derive(Debug)]
pub struct Completed(Result<State2>);
impl ProtocolsHandler for Handler {

View file

@ -43,7 +43,7 @@ where
);
let transport = asb::transport::new(&identity)?;
let peer_id = identity.public().into_peer_id();
let peer_id = identity.public().into();
let swarm = SwarmBuilder::new(transport, behaviour, peer_id)
.executor(Box::new(|f| {
@ -68,7 +68,7 @@ where
};
let transport = cli::transport::new(&identity, maybe_tor_socks5_port)?;
let peer_id = identity.public().into_peer_id();
let peer_id = identity.public().into();
let swarm = SwarmBuilder::new(transport, behaviour, peer_id)
.executor(Box::new(|f| {

View file

@ -203,7 +203,7 @@ where
{
loop {
match swarm.select_next_some().await {
SwarmEvent::NewListenAddr(addr) if &addr == multiaddr => {
SwarmEvent::NewListenAddr { address, .. } if &address == multiaddr => {
break;
}
other => {