Introduce a redial::Behaviour

This behaviour makes Bob re-dial Alice with an exponential backoff as
soon as the connection is lost.
This commit is contained in:
Thomas Eizinger 2021-04-09 11:50:46 +10:00
parent d4c10a1292
commit f0f7288bb6
No known key found for this signature in database
GPG Key ID: 651AC83A6C6C8B96
9 changed files with 171 additions and 28 deletions

View File

@ -76,7 +76,7 @@ async fn main() -> Result<()> {
init_monero_wallet(data_dir, monero_daemon_host, env_config).await?; init_monero_wallet(data_dir, monero_daemon_host, env_config).await?;
let bitcoin_wallet = Arc::new(bitcoin_wallet); let bitcoin_wallet = Arc::new(bitcoin_wallet);
let mut swarm = swarm::bob(&seed)?; let mut swarm = swarm::bob(&seed, alice_peer_id)?;
swarm.add_address(alice_peer_id, alice_multiaddr); swarm.add_address(alice_peer_id, alice_multiaddr);
let swap_id = Uuid::new_v4(); let swap_id = Uuid::new_v4();
@ -171,7 +171,7 @@ async fn main() -> Result<()> {
let bitcoin_wallet = Arc::new(bitcoin_wallet); let bitcoin_wallet = Arc::new(bitcoin_wallet);
let alice_peer_id = db.get_peer_id(swap_id)?; let alice_peer_id = db.get_peer_id(swap_id)?;
let mut swarm = swarm::bob(&seed)?; let mut swarm = swarm::bob(&seed, alice_peer_id)?;
swarm.add_address(alice_peer_id, alice_multiaddr); swarm.add_address(alice_peer_id, alice_multiaddr);
let (event_loop, event_loop_handle) = let (event_loop, event_loop_handle) =

View File

@ -1,6 +1,7 @@
pub mod cbor_request_response; pub mod cbor_request_response;
pub mod encrypted_signature; pub mod encrypted_signature;
pub mod quote; pub mod quote;
pub mod redial;
pub mod spot_price; pub mod spot_price;
pub mod swarm; pub mod swarm;
pub mod transfer_proof; pub mod transfer_proof;

119
swap/src/network/redial.rs Normal file
View File

@ -0,0 +1,119 @@
use backoff::backoff::Backoff;
use backoff::ExponentialBackoff;
use futures::future::FutureExt;
use libp2p::core::connection::ConnectionId;
use libp2p::core::Multiaddr;
use libp2p::swarm::protocols_handler::DummyProtocolsHandler;
use libp2p::swarm::{DialPeerCondition, NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use libp2p::PeerId;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::time::{Instant, Sleep};
use void::Void;
pub enum OutEvent {
AllAttemptsExhausted { peer: PeerId },
}
/// A [`NetworkBehaviour`] that tracks whether we are connected to the given
/// peer and attempts to re-establish a connection with an exponential backoff
/// if we lose the connection.
pub struct Behaviour {
/// The peer we are interested in.
peer: PeerId,
/// If present, tracks for how long we need to sleep until we dial again.
sleep: Option<Pin<Box<Sleep>>>,
/// Tracks the current backoff state.
backoff: ExponentialBackoff,
}
impl Behaviour {
pub fn new(peer: PeerId, interval: Duration) -> Self {
Self {
peer,
sleep: None,
backoff: ExponentialBackoff {
initial_interval: interval,
current_interval: interval,
// give up dialling after 5 minutes
max_elapsed_time: Some(Duration::from_secs(5 * 60)),
..ExponentialBackoff::default()
},
}
}
pub fn until_next_redial(&self) -> Option<Duration> {
let until_next_redial = self
.sleep
.as_ref()?
.deadline()
.checked_duration_since(Instant::now())?;
Some(until_next_redial)
}
}
impl NetworkBehaviour for Behaviour {
type ProtocolsHandler = DummyProtocolsHandler;
type OutEvent = OutEvent;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
DummyProtocolsHandler::default()
}
fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> {
Vec::new()
}
fn inject_connected(&mut self, peer_id: &PeerId) {
if peer_id != &self.peer {
return;
}
// established a connection to the desired peer, cancel any active re-dialling
self.sleep = None;
}
fn inject_disconnected(&mut self, peer_id: &PeerId) {
if peer_id != &self.peer {
return;
}
// lost connection to the configured peer, trigger re-dialling with an
// exponential backoff
self.backoff.reset();
self.sleep = Some(Box::pin(tokio::time::sleep(self.backoff.initial_interval)));
}
fn inject_event(&mut self, _: PeerId, _: ConnectionId, _: Void) {}
fn poll(
&mut self,
cx: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Void, Self::OutEvent>> {
let sleep = match self.sleep.as_mut() {
None => return Poll::Pending, // early exit if we shouldn't be re-dialling
Some(future) => future,
};
futures::ready!(sleep.poll_unpin(cx));
let next_dial_in = match self.backoff.next_backoff() {
Some(next_dial_in) => next_dial_in,
None => {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(
OutEvent::AllAttemptsExhausted { peer: self.peer },
));
}
};
self.sleep = Some(Box::pin(tokio::time::sleep(next_dial_in)));
Poll::Ready(NetworkBehaviourAction::DialPeer {
peer_id: self.peer,
condition: DialPeerCondition::Disconnected,
})
}
}

View File

@ -3,23 +3,21 @@ use crate::protocol::{alice, bob};
use crate::seed::Seed; use crate::seed::Seed;
use anyhow::Result; use anyhow::Result;
use libp2p::swarm::{NetworkBehaviour, SwarmBuilder}; use libp2p::swarm::{NetworkBehaviour, SwarmBuilder};
use libp2p::Swarm; use libp2p::{PeerId, Swarm};
pub fn alice(seed: &Seed) -> Result<Swarm<alice::Behaviour>> { pub fn alice(seed: &Seed) -> Result<Swarm<alice::Behaviour>> {
new(seed) new(seed, alice::Behaviour::default())
} }
pub fn bob(seed: &Seed) -> Result<Swarm<bob::Behaviour>> { pub fn bob(seed: &Seed, alice: PeerId) -> Result<Swarm<bob::Behaviour>> {
new(seed) new(seed, bob::Behaviour::new(alice))
} }
fn new<B>(seed: &Seed) -> Result<Swarm<B>> fn new<B>(seed: &Seed, behaviour: B) -> Result<Swarm<B>>
where where
B: NetworkBehaviour + Default, B: NetworkBehaviour,
{ {
let identity = seed.derive_libp2p_identity(); let identity = seed.derive_libp2p_identity();
let behaviour = B::default();
let transport = transport::build(&identity)?; let transport = transport::build(&identity)?;
let swarm = SwarmBuilder::new(transport, behaviour, identity.public().into_peer_id()) let swarm = SwarmBuilder::new(transport, behaviour, identity.public().into_peer_id())

View File

@ -7,6 +7,7 @@ use libp2p::dns::TokioDnsConfig;
use libp2p::mplex::MplexConfig; use libp2p::mplex::MplexConfig;
use libp2p::noise::{self, NoiseConfig, X25519Spec}; use libp2p::noise::{self, NoiseConfig, X25519Spec};
use libp2p::{yamux, PeerId}; use libp2p::{yamux, PeerId};
use std::time::Duration;
/// Builds a libp2p transport with the following features: /// Builds a libp2p transport with the following features:
/// - TcpConnection /// - TcpConnection
@ -29,6 +30,7 @@ pub fn build(id_keys: &identity::Keypair) -> Result<SwapTransport> {
yamux::YamuxConfig::default(), yamux::YamuxConfig::default(),
MplexConfig::new(), MplexConfig::new(),
)) ))
.timeout(Duration::from_secs(20))
.map(|(peer, muxer), _| (peer, StreamMuxerBox::new(muxer))) .map(|(peer, muxer), _| (peer, StreamMuxerBox::new(muxer)))
.boxed(); .boxed();

View File

@ -247,6 +247,9 @@ where
} }
} }
} }
SwarmEvent::NewListenAddr(addr) => {
tracing::info!("Listening on {}", addr);
}
_ => {} _ => {}
} }
}, },

View File

@ -1,6 +1,7 @@
use crate::database::Database; use crate::database::Database;
use crate::env::Config; use crate::env::Config;
use crate::network::{encrypted_signature, spot_price}; use crate::network::quote::BidQuote;
use crate::network::{encrypted_signature, quote, redial, spot_price, transfer_proof};
use crate::protocol::bob; use crate::protocol::bob;
use crate::{bitcoin, monero}; use crate::{bitcoin, monero};
use anyhow::{anyhow, Error, Result}; use anyhow::{anyhow, Error, Result};
@ -17,8 +18,7 @@ pub use self::event_loop::{EventLoop, EventLoopHandle};
pub use self::refund::refund; pub use self::refund::refund;
pub use self::state::*; pub use self::state::*;
pub use self::swap::{run, run_until}; pub use self::swap::{run, run_until};
use crate::network::quote::BidQuote; use std::time::Duration;
use crate::network::{quote, transfer_proof};
pub mod cancel; pub mod cancel;
pub mod event_loop; pub mod event_loop;
@ -125,6 +125,9 @@ pub enum OutEvent {
EncryptedSignatureAcknowledged { EncryptedSignatureAcknowledged {
id: RequestId, id: RequestId,
}, },
AllRedialAttemptsExhausted {
peer: PeerId,
},
ResponseSent, // Same variant is used for all messages as no processing is done ResponseSent, // Same variant is used for all messages as no processing is done
CommunicationError(Error), CommunicationError(Error),
} }
@ -218,6 +221,16 @@ impl From<encrypted_signature::OutEvent> for OutEvent {
} }
} }
impl From<redial::OutEvent> for OutEvent {
fn from(event: redial::OutEvent) -> Self {
match event {
redial::OutEvent::AllAttemptsExhausted { peer } => {
OutEvent::AllRedialAttemptsExhausted { peer }
}
}
}
}
fn map_rr_event_to_outevent<I, O>(event: RequestResponseEvent<I, O>) -> OutEvent fn map_rr_event_to_outevent<I, O>(event: RequestResponseEvent<I, O>) -> OutEvent
where where
OutEvent: From<RequestResponseMessage<I, O>>, OutEvent: From<RequestResponseMessage<I, O>>,
@ -258,21 +271,21 @@ pub struct Behaviour {
pub execution_setup: execution_setup::Behaviour, pub execution_setup: execution_setup::Behaviour,
pub transfer_proof: transfer_proof::Behaviour, pub transfer_proof: transfer_proof::Behaviour,
pub encrypted_signature: encrypted_signature::Behaviour, pub encrypted_signature: encrypted_signature::Behaviour,
pub redial: redial::Behaviour,
} }
impl Default for Behaviour { impl Behaviour {
fn default() -> Self { pub fn new(alice: PeerId) -> Self {
Self { Self {
quote: quote::bob(), quote: quote::bob(),
spot_price: spot_price::bob(), spot_price: spot_price::bob(),
execution_setup: Default::default(), execution_setup: Default::default(),
transfer_proof: transfer_proof::bob(), transfer_proof: transfer_proof::bob(),
encrypted_signature: encrypted_signature::bob(), encrypted_signature: encrypted_signature::bob(),
} redial: redial::Behaviour::new(alice, Duration::from_secs(2)),
} }
} }
impl Behaviour {
/// Add a known address for the given peer /// Add a known address for the given peer
pub fn add_address(&mut self, peer_id: PeerId, address: Multiaddr) { pub fn add_address(&mut self, peer_id: PeerId, address: Multiaddr) {
self.quote.add_address(&peer_id, address.clone()); self.quote.add_address(&peer_id, address.clone());

View File

@ -89,7 +89,13 @@ impl EventLoop {
} }
pub async fn run(mut self) { pub async fn run(mut self) {
let _ = Swarm::dial(&mut self.swarm, &self.alice_peer_id); match libp2p::Swarm::dial(&mut self.swarm, &self.alice_peer_id) {
Ok(()) => {}
Err(e) => {
tracing::error!("Failed to initiate dial to Alice: {}", e);
return;
}
}
loop { loop {
// Note: We are making very elaborate use of `select!` macro's feature here. Make sure to read the documentation thoroughly: https://docs.rs/tokio/1.4.0/tokio/macro.select.html // Note: We are making very elaborate use of `select!` macro's feature here. Make sure to read the documentation thoroughly: https://docs.rs/tokio/1.4.0/tokio/macro.select.html
@ -141,6 +147,10 @@ impl EventLoop {
let _ = responder.respond(()); let _ = responder.respond(());
} }
} }
SwarmEvent::Behaviour(OutEvent::AllRedialAttemptsExhausted { peer }) if peer == self.alice_peer_id => {
tracing::error!("Exhausted all re-dial attempts to Alice");
return;
}
SwarmEvent::Behaviour(OutEvent::ResponseSent) => { SwarmEvent::Behaviour(OutEvent::ResponseSent) => {
} }
@ -149,7 +159,7 @@ impl EventLoop {
return; return;
} }
SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } if peer_id == self.alice_peer_id => { SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } if peer_id == self.alice_peer_id => {
tracing::debug!("Connected to Alice at {}", endpoint.get_remote_address()); tracing::info!("Connected to Alice at {}", endpoint.get_remote_address());
} }
SwarmEvent::Dialing(peer_id) if peer_id == self.alice_peer_id => { SwarmEvent::Dialing(peer_id) if peer_id == self.alice_peer_id => {
tracing::debug!("Dialling Alice at {}", peer_id); tracing::debug!("Dialling Alice at {}", peer_id);
@ -165,16 +175,13 @@ impl EventLoop {
return; return;
} }
} }
match libp2p::Swarm::dial(&mut self.swarm, &self.alice_peer_id) {
Ok(()) => {},
Err(e) => {
tracing::warn!("Failed to re-dial Alice: {}", e);
return;
}
}
} }
SwarmEvent::UnreachableAddr { peer_id, address, attempts_remaining, error } if peer_id == self.alice_peer_id && attempts_remaining == 0 => { SwarmEvent::UnreachableAddr { peer_id, address, attempts_remaining, error } if peer_id == self.alice_peer_id && attempts_remaining == 0 => {
tracing::warn!("Failed to dial Alice at {}: {}", address, error); tracing::warn!(%address, "Failed to dial Alice: {}", error);
if let Some(duration) = self.swarm.redial.until_next_redial() {
tracing::info!("Next redial attempt in {}s", duration.as_secs());
}
} }
_ => {} _ => {}
} }

View File

@ -116,7 +116,7 @@ impl BobParams {
} }
pub fn new_eventloop(&self, swap_id: Uuid) -> Result<(bob::EventLoop, bob::EventLoopHandle)> { pub fn new_eventloop(&self, swap_id: Uuid) -> Result<(bob::EventLoop, bob::EventLoopHandle)> {
let mut swarm = swarm::bob(&self.seed)?; let mut swarm = swarm::bob(&self.seed, self.alice_peer_id)?;
swarm.add_address(self.alice_peer_id, self.alice_address.clone()); swarm.add_address(self.alice_peer_id, self.alice_address.clone());
bob::EventLoop::new( bob::EventLoop::new(