diff --git a/swap/src/bin/swap.rs b/swap/src/bin/swap.rs index f815fb02..ed3cc13d 100644 --- a/swap/src/bin/swap.rs +++ b/swap/src/bin/swap.rs @@ -76,7 +76,7 @@ async fn main() -> Result<()> { init_monero_wallet(data_dir, monero_daemon_host, env_config).await?; let bitcoin_wallet = Arc::new(bitcoin_wallet); - let mut swarm = swarm::bob(&seed)?; + let mut swarm = swarm::bob(&seed, alice_peer_id)?; swarm.add_address(alice_peer_id, alice_multiaddr); let swap_id = Uuid::new_v4(); @@ -171,7 +171,7 @@ async fn main() -> Result<()> { let bitcoin_wallet = Arc::new(bitcoin_wallet); let alice_peer_id = db.get_peer_id(swap_id)?; - let mut swarm = swarm::bob(&seed)?; + let mut swarm = swarm::bob(&seed, alice_peer_id)?; swarm.add_address(alice_peer_id, alice_multiaddr); let (event_loop, event_loop_handle) = diff --git a/swap/src/network.rs b/swap/src/network.rs index 4a3fb29e..344e64ae 100644 --- a/swap/src/network.rs +++ b/swap/src/network.rs @@ -1,6 +1,7 @@ pub mod cbor_request_response; pub mod encrypted_signature; pub mod quote; +pub mod redial; pub mod spot_price; pub mod swarm; pub mod transfer_proof; diff --git a/swap/src/network/redial.rs b/swap/src/network/redial.rs new file mode 100644 index 00000000..0341b87e --- /dev/null +++ b/swap/src/network/redial.rs @@ -0,0 +1,119 @@ +use backoff::backoff::Backoff; +use backoff::ExponentialBackoff; +use futures::future::FutureExt; +use libp2p::core::connection::ConnectionId; +use libp2p::core::Multiaddr; +use libp2p::swarm::protocols_handler::DummyProtocolsHandler; +use libp2p::swarm::{DialPeerCondition, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; +use libp2p::PeerId; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Duration; +use tokio::time::{Instant, Sleep}; +use void::Void; + +pub enum OutEvent { + AllAttemptsExhausted { peer: PeerId }, +} + +/// A [`NetworkBehaviour`] that tracks whether we are connected to the given +/// peer and attempts to re-establish a connection with an exponential backoff +/// if we lose the connection. +pub struct Behaviour { + /// The peer we are interested in. + peer: PeerId, + /// If present, tracks for how long we need to sleep until we dial again. + sleep: Option>>, + /// Tracks the current backoff state. + backoff: ExponentialBackoff, +} + +impl Behaviour { + pub fn new(peer: PeerId, interval: Duration) -> Self { + Self { + peer, + sleep: None, + backoff: ExponentialBackoff { + initial_interval: interval, + current_interval: interval, + // give up dialling after 5 minutes + max_elapsed_time: Some(Duration::from_secs(5 * 60)), + ..ExponentialBackoff::default() + }, + } + } + + pub fn until_next_redial(&self) -> Option { + let until_next_redial = self + .sleep + .as_ref()? + .deadline() + .checked_duration_since(Instant::now())?; + + Some(until_next_redial) + } +} + +impl NetworkBehaviour for Behaviour { + type ProtocolsHandler = DummyProtocolsHandler; + type OutEvent = OutEvent; + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + DummyProtocolsHandler::default() + } + + fn addresses_of_peer(&mut self, _: &PeerId) -> Vec { + Vec::new() + } + + fn inject_connected(&mut self, peer_id: &PeerId) { + if peer_id != &self.peer { + return; + } + + // established a connection to the desired peer, cancel any active re-dialling + self.sleep = None; + } + + fn inject_disconnected(&mut self, peer_id: &PeerId) { + if peer_id != &self.peer { + return; + } + + // lost connection to the configured peer, trigger re-dialling with an + // exponential backoff + self.backoff.reset(); + self.sleep = Some(Box::pin(tokio::time::sleep(self.backoff.initial_interval))); + } + + fn inject_event(&mut self, _: PeerId, _: ConnectionId, _: Void) {} + + fn poll( + &mut self, + cx: &mut Context<'_>, + _: &mut impl PollParameters, + ) -> Poll> { + let sleep = match self.sleep.as_mut() { + None => return Poll::Pending, // early exit if we shouldn't be re-dialling + Some(future) => future, + }; + + futures::ready!(sleep.poll_unpin(cx)); + + let next_dial_in = match self.backoff.next_backoff() { + Some(next_dial_in) => next_dial_in, + None => { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent( + OutEvent::AllAttemptsExhausted { peer: self.peer }, + )); + } + }; + + self.sleep = Some(Box::pin(tokio::time::sleep(next_dial_in))); + + Poll::Ready(NetworkBehaviourAction::DialPeer { + peer_id: self.peer, + condition: DialPeerCondition::Disconnected, + }) + } +} diff --git a/swap/src/network/swarm.rs b/swap/src/network/swarm.rs index 220d8156..9d0bea28 100644 --- a/swap/src/network/swarm.rs +++ b/swap/src/network/swarm.rs @@ -3,23 +3,21 @@ use crate::protocol::{alice, bob}; use crate::seed::Seed; use anyhow::Result; use libp2p::swarm::{NetworkBehaviour, SwarmBuilder}; -use libp2p::Swarm; +use libp2p::{PeerId, Swarm}; pub fn alice(seed: &Seed) -> Result> { - new(seed) + new(seed, alice::Behaviour::default()) } -pub fn bob(seed: &Seed) -> Result> { - new(seed) +pub fn bob(seed: &Seed, alice: PeerId) -> Result> { + new(seed, bob::Behaviour::new(alice)) } -fn new(seed: &Seed) -> Result> +fn new(seed: &Seed, behaviour: B) -> Result> where - B: NetworkBehaviour + Default, + B: NetworkBehaviour, { let identity = seed.derive_libp2p_identity(); - - let behaviour = B::default(); let transport = transport::build(&identity)?; let swarm = SwarmBuilder::new(transport, behaviour, identity.public().into_peer_id()) diff --git a/swap/src/network/transport.rs b/swap/src/network/transport.rs index dc12cda3..f07e612b 100644 --- a/swap/src/network/transport.rs +++ b/swap/src/network/transport.rs @@ -7,6 +7,7 @@ use libp2p::dns::TokioDnsConfig; use libp2p::mplex::MplexConfig; use libp2p::noise::{self, NoiseConfig, X25519Spec}; use libp2p::{yamux, PeerId}; +use std::time::Duration; /// Builds a libp2p transport with the following features: /// - TcpConnection @@ -29,6 +30,7 @@ pub fn build(id_keys: &identity::Keypair) -> Result { yamux::YamuxConfig::default(), MplexConfig::new(), )) + .timeout(Duration::from_secs(20)) .map(|(peer, muxer), _| (peer, StreamMuxerBox::new(muxer))) .boxed(); diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index 930a435c..33f1eaaf 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -247,6 +247,9 @@ where } } } + SwarmEvent::NewListenAddr(addr) => { + tracing::info!("Listening on {}", addr); + } _ => {} } }, diff --git a/swap/src/protocol/bob.rs b/swap/src/protocol/bob.rs index bb411560..c913bd90 100644 --- a/swap/src/protocol/bob.rs +++ b/swap/src/protocol/bob.rs @@ -1,6 +1,7 @@ use crate::database::Database; use crate::env::Config; -use crate::network::{encrypted_signature, spot_price}; +use crate::network::quote::BidQuote; +use crate::network::{encrypted_signature, quote, redial, spot_price, transfer_proof}; use crate::protocol::bob; use crate::{bitcoin, monero}; use anyhow::{anyhow, Error, Result}; @@ -17,8 +18,7 @@ pub use self::event_loop::{EventLoop, EventLoopHandle}; pub use self::refund::refund; pub use self::state::*; pub use self::swap::{run, run_until}; -use crate::network::quote::BidQuote; -use crate::network::{quote, transfer_proof}; +use std::time::Duration; pub mod cancel; pub mod event_loop; @@ -125,6 +125,9 @@ pub enum OutEvent { EncryptedSignatureAcknowledged { id: RequestId, }, + AllRedialAttemptsExhausted { + peer: PeerId, + }, ResponseSent, // Same variant is used for all messages as no processing is done CommunicationError(Error), } @@ -218,6 +221,16 @@ impl From for OutEvent { } } +impl From for OutEvent { + fn from(event: redial::OutEvent) -> Self { + match event { + redial::OutEvent::AllAttemptsExhausted { peer } => { + OutEvent::AllRedialAttemptsExhausted { peer } + } + } + } +} + fn map_rr_event_to_outevent(event: RequestResponseEvent) -> OutEvent where OutEvent: From>, @@ -258,21 +271,21 @@ pub struct Behaviour { pub execution_setup: execution_setup::Behaviour, pub transfer_proof: transfer_proof::Behaviour, pub encrypted_signature: encrypted_signature::Behaviour, + pub redial: redial::Behaviour, } -impl Default for Behaviour { - fn default() -> Self { +impl Behaviour { + pub fn new(alice: PeerId) -> Self { Self { quote: quote::bob(), spot_price: spot_price::bob(), execution_setup: Default::default(), transfer_proof: transfer_proof::bob(), encrypted_signature: encrypted_signature::bob(), + redial: redial::Behaviour::new(alice, Duration::from_secs(2)), } } -} -impl Behaviour { /// Add a known address for the given peer pub fn add_address(&mut self, peer_id: PeerId, address: Multiaddr) { self.quote.add_address(&peer_id, address.clone()); diff --git a/swap/src/protocol/bob/event_loop.rs b/swap/src/protocol/bob/event_loop.rs index 35daa041..0a93d7b3 100644 --- a/swap/src/protocol/bob/event_loop.rs +++ b/swap/src/protocol/bob/event_loop.rs @@ -89,7 +89,13 @@ impl EventLoop { } pub async fn run(mut self) { - let _ = Swarm::dial(&mut self.swarm, &self.alice_peer_id); + match libp2p::Swarm::dial(&mut self.swarm, &self.alice_peer_id) { + Ok(()) => {} + Err(e) => { + tracing::error!("Failed to initiate dial to Alice: {}", e); + return; + } + } loop { // Note: We are making very elaborate use of `select!` macro's feature here. Make sure to read the documentation thoroughly: https://docs.rs/tokio/1.4.0/tokio/macro.select.html @@ -141,6 +147,10 @@ impl EventLoop { let _ = responder.respond(()); } } + SwarmEvent::Behaviour(OutEvent::AllRedialAttemptsExhausted { peer }) if peer == self.alice_peer_id => { + tracing::error!("Exhausted all re-dial attempts to Alice"); + return; + } SwarmEvent::Behaviour(OutEvent::ResponseSent) => { } @@ -149,7 +159,7 @@ impl EventLoop { return; } SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } if peer_id == self.alice_peer_id => { - tracing::debug!("Connected to Alice at {}", endpoint.get_remote_address()); + tracing::info!("Connected to Alice at {}", endpoint.get_remote_address()); } SwarmEvent::Dialing(peer_id) if peer_id == self.alice_peer_id => { tracing::debug!("Dialling Alice at {}", peer_id); @@ -165,16 +175,13 @@ impl EventLoop { return; } } - match libp2p::Swarm::dial(&mut self.swarm, &self.alice_peer_id) { - Ok(()) => {}, - Err(e) => { - tracing::warn!("Failed to re-dial Alice: {}", e); - return; - } - } } SwarmEvent::UnreachableAddr { peer_id, address, attempts_remaining, error } if peer_id == self.alice_peer_id && attempts_remaining == 0 => { - tracing::warn!("Failed to dial Alice at {}: {}", address, error); + tracing::warn!(%address, "Failed to dial Alice: {}", error); + + if let Some(duration) = self.swarm.redial.until_next_redial() { + tracing::info!("Next redial attempt in {}s", duration.as_secs()); + } } _ => {} } diff --git a/swap/tests/harness/mod.rs b/swap/tests/harness/mod.rs index ada61805..cdad8af8 100644 --- a/swap/tests/harness/mod.rs +++ b/swap/tests/harness/mod.rs @@ -116,7 +116,7 @@ impl BobParams { } pub fn new_eventloop(&self, swap_id: Uuid) -> Result<(bob::EventLoop, bob::EventLoopHandle)> { - let mut swarm = swarm::bob(&self.seed)?; + let mut swarm = swarm::bob(&self.seed, self.alice_peer_id)?; swarm.add_address(self.alice_peer_id, self.alice_address.clone()); bob::EventLoop::new(