From cde3f0f74ad7b500db607057238d68b182d69a04 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 18 Mar 2021 18:00:02 +1100 Subject: [PATCH] Remove connection handling from swap execution The swap should not be concerned with connection handling. This is the responsibility of the overall application. All but the execution-setup NetworkBehaviour are `request-response` behaviours. These have built-in functionality to automatically emit a dial attempt in case we are not connected at the time we want to send a message. We remove all of the manual dialling code from the swap in favor of this behaviour. Additionally, we make sure to establish a connection as soon as the EventLoop gets started. In case we ever loose the connection to Alice, we try to re-establish it. --- bors.toml | 3 +- swap/src/bin/swap.rs | 5 +- swap/src/network.rs | 1 - swap/src/network/peer_tracker.rs | 126 ------------------ swap/src/protocol/alice/behaviour.rs | 15 +-- swap/src/protocol/alice/event_loop.rs | 3 - swap/src/protocol/bob.rs | 19 +-- swap/src/protocol/bob/event_loop.rs | 80 ++++++----- swap/src/protocol/bob/swap.rs | 9 -- ...fore_comm.rs => happy_path_restart_bob.rs} | 30 ++++- swap/tests/testutils/mod.rs | 3 +- 11 files changed, 79 insertions(+), 215 deletions(-) delete mode 100644 swap/src/network/peer_tracker.rs rename swap/tests/{happy_path_restart_bob_before_comm.rs => happy_path_restart_bob.rs} (51%) diff --git a/bors.toml b/bors.toml index 9375744a..9a5198e7 100644 --- a/bors.toml +++ b/bors.toml @@ -7,7 +7,8 @@ status = [ "test (x86_64-unknown-linux-gnu, ubuntu-latest)", "test (x86_64-apple-darwin, macos-latest)", "docker_tests (happy_path)", - "docker_tests (happy_path_restart_bob_before_comm)", + "docker_tests (happy_path_restart_bob_after_xmr_locked)", + "docker_tests (happy_path_restart_bob_before_xmr_locked)", "docker_tests (bob_refunds_using_cancel_and_refund_command)", "docker_tests (bob_refunds_using_cancel_and_refund_command_timelock_not_expired_force)", "docker_tests (bob_refunds_using_cancel_and_refund_command_timelock_not_expired)", diff --git a/swap/src/bin/swap.rs b/swap/src/bin/swap.rs index da3d2a23..c1d6721f 100644 --- a/swap/src/bin/swap.rs +++ b/swap/src/bin/swap.rs @@ -146,8 +146,7 @@ async fn main() -> Result<()> { tokio::select! { result = event_loop => { result - .context("EventLoop panicked")? - .context("EventLoop failed")?; + .context("EventLoop panicked")?; }, result = bob::run(swap) => { result.context("Failed to complete swap")?; @@ -210,7 +209,7 @@ async fn main() -> Result<()> { tokio::select! { event_loop_result = handle => { - event_loop_result??; + event_loop_result?; }, swap_result = bob::run(swap) => { swap_result?; diff --git a/swap/src/network.rs b/swap/src/network.rs index f279fb50..4a3fb29e 100644 --- a/swap/src/network.rs +++ b/swap/src/network.rs @@ -1,6 +1,5 @@ pub mod cbor_request_response; pub mod encrypted_signature; -pub mod peer_tracker; pub mod quote; pub mod spot_price; pub mod swarm; diff --git a/swap/src/network/peer_tracker.rs b/swap/src/network/peer_tracker.rs deleted file mode 100644 index 36a536ce..00000000 --- a/swap/src/network/peer_tracker.rs +++ /dev/null @@ -1,126 +0,0 @@ -use futures::task::Context; -use libp2p::core::connection::ConnectionId; -use libp2p::core::ConnectedPoint; -use libp2p::swarm::protocols_handler::DummyProtocolsHandler; -use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters}; -use libp2p::{Multiaddr, PeerId}; -use std::collections::{HashMap, VecDeque}; -use std::task::Poll; - -#[derive(Debug, Copy, Clone)] -pub enum OutEvent { - ConnectionEstablished(PeerId), -} - -/// A NetworkBehaviour that tracks connections to the counterparty. Although the -/// libp2p `NetworkBehaviour` abstraction encompasses connections to multiple -/// peers we only ever connect to a single counterparty. Peer Tracker tracks -/// that connection. -#[derive(Default, Debug)] -pub struct Behaviour { - connected: Option<(PeerId, Multiaddr)>, - address_of_peer: HashMap, - events: VecDeque, -} - -impl Behaviour { - /// Return whether we are connected to the given peer. - pub fn is_connected(&self, peer_id: &PeerId) -> bool { - if let Some((connected_peer_id, _)) = &self.connected { - if connected_peer_id == peer_id { - return true; - } - } - false - } - - /// Returns the peer id of counterparty if we are connected. - pub fn counterparty_peer_id(&self) -> Option { - if let Some((id, _)) = &self.connected { - return Some(*id); - } - None - } - - /// Returns the peer_id and multiaddr of counterparty if we are connected. - pub fn counterparty(&self) -> Option<(PeerId, Multiaddr)> { - if let Some((peer_id, addr)) = &self.connected { - return Some((*peer_id, addr.clone())); - } - None - } - - /// Add an address for a given peer. We only store one address per peer. - pub fn add_address(&mut self, peer_id: PeerId, address: Multiaddr) { - self.address_of_peer.insert(peer_id, address); - } -} - -impl NetworkBehaviour for Behaviour { - type ProtocolsHandler = DummyProtocolsHandler; - type OutEvent = OutEvent; - - fn new_handler(&mut self) -> Self::ProtocolsHandler { - DummyProtocolsHandler::default() - } - - fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { - let mut addresses: Vec = vec![]; - - if let Some((counterparty_peer_id, addr)) = self.counterparty() { - if counterparty_peer_id == *peer_id { - addresses.push(addr) - } - } - - if let Some(addr) = self.address_of_peer.get(peer_id) { - addresses.push(addr.clone()); - } - - addresses - } - - fn inject_connected(&mut self, _: &PeerId) {} - - fn inject_disconnected(&mut self, _: &PeerId) {} - - fn inject_connection_established( - &mut self, - peer: &PeerId, - _: &ConnectionId, - point: &ConnectedPoint, - ) { - match point { - ConnectedPoint::Dialer { address } => { - self.connected = Some((*peer, address.clone())); - } - ConnectedPoint::Listener { - local_addr: _, - send_back_addr, - } => { - self.connected = Some((*peer, send_back_addr.clone())); - } - } - - self.events - .push_back(OutEvent::ConnectionEstablished(*peer)); - } - - fn inject_connection_closed(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) { - self.connected = None; - } - - fn inject_event(&mut self, _: PeerId, _: ConnectionId, _: void::Void) {} - - fn poll( - &mut self, - _: &mut Context<'_>, - _: &mut impl PollParameters, - ) -> Poll> { - if let Some(event) = self.events.pop_front() { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); - } - - Poll::Pending - } -} diff --git a/swap/src/protocol/alice/behaviour.rs b/swap/src/protocol/alice/behaviour.rs index c74e0356..58be2d50 100644 --- a/swap/src/protocol/alice/behaviour.rs +++ b/swap/src/protocol/alice/behaviour.rs @@ -1,6 +1,6 @@ use crate::env::Config; use crate::network::quote::BidQuote; -use crate::network::{encrypted_signature, peer_tracker, quote, spot_price, transfer_proof}; +use crate::network::{encrypted_signature, quote, spot_price, transfer_proof}; use crate::protocol::alice::{execution_setup, State0, State3}; use crate::{bitcoin, monero}; use anyhow::{anyhow, Error, Result}; @@ -11,7 +11,6 @@ use tracing::debug; #[derive(Debug)] pub enum OutEvent { - ConnectionEstablished(PeerId), SpotPriceRequested { request: spot_price::Request, channel: ResponseChannel, @@ -38,16 +37,6 @@ pub enum OutEvent { }, } -impl From for OutEvent { - fn from(event: peer_tracker::OutEvent) -> Self { - match event { - peer_tracker::OutEvent::ConnectionEstablished(id) => { - OutEvent::ConnectionEstablished(id) - } - } - } -} - impl OutEvent { fn unexpected_request(peer: PeerId) -> OutEvent { OutEvent::Failure { @@ -177,7 +166,6 @@ impl From for OutEvent { #[behaviour(out_event = "OutEvent", event_process = false)] #[allow(missing_debug_implementations)] pub struct Behaviour { - pt: peer_tracker::Behaviour, quote: quote::Behaviour, spot_price: spot_price::Behaviour, execution_setup: execution_setup::Behaviour, @@ -188,7 +176,6 @@ pub struct Behaviour { impl Default for Behaviour { fn default() -> Self { Self { - pt: Default::default(), quote: quote::alice(), spot_price: spot_price::alice(), execution_setup: Default::default(), diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index 136453a7..b95b91fb 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -83,9 +83,6 @@ where tokio::select! { swarm_event = self.swarm.next_event() => { match swarm_event { - SwarmEvent::Behaviour(OutEvent::ConnectionEstablished(alice)) => { - debug!("Connection Established with {}", alice); - } SwarmEvent::Behaviour(OutEvent::SpotPriceRequested { request, channel, peer }) => { let btc = request.btc; let xmr = match self.handle_spot_price_request(btc, self.monero_wallet.clone()).await { diff --git a/swap/src/protocol/bob.rs b/swap/src/protocol/bob.rs index 3e5a7e0c..604d879a 100644 --- a/swap/src/protocol/bob.rs +++ b/swap/src/protocol/bob.rs @@ -1,6 +1,6 @@ use crate::database::Database; use crate::env::Config; -use crate::network::{encrypted_signature, peer_tracker, spot_price}; +use crate::network::{encrypted_signature, spot_price}; use crate::protocol::bob; use crate::{bitcoin, monero}; use anyhow::{anyhow, Error, Result}; @@ -175,16 +175,6 @@ impl From for OutEvent { } } -impl From for OutEvent { - fn from(event: peer_tracker::OutEvent) -> Self { - match event { - peer_tracker::OutEvent::ConnectionEstablished(id) => { - OutEvent::ConnectionEstablished(id) - } - } - } -} - impl From for OutEvent { fn from(event: spot_price::OutEvent) -> Self { map_rr_event_to_outevent(event) @@ -244,7 +234,6 @@ impl From for OutEvent { #[behaviour(out_event = "OutEvent", event_process = false)] #[allow(missing_debug_implementations)] pub struct Behaviour { - pt: peer_tracker::Behaviour, quote: quote::Behaviour, spot_price: spot_price::Behaviour, execution_setup: execution_setup::Behaviour, @@ -255,7 +244,6 @@ pub struct Behaviour { impl Default for Behaviour { fn default() -> Self { Self { - pt: Default::default(), quote: quote::bob(), spot_price: spot_price::bob(), execution_setup: Default::default(), @@ -296,6 +284,9 @@ impl Behaviour { /// Add a known address for the given peer pub fn add_address(&mut self, peer_id: PeerId, address: Multiaddr) { - self.pt.add_address(peer_id, address) + self.quote.add_address(&peer_id, address.clone()); + self.spot_price.add_address(&peer_id, address.clone()); + self.transfer_proof.add_address(&peer_id, address.clone()); + self.encrypted_signature.add_address(&peer_id, address); } } diff --git a/swap/src/protocol/bob/event_loop.rs b/swap/src/protocol/bob/event_loop.rs index fc40805a..9f55bf2a 100644 --- a/swap/src/protocol/bob/event_loop.rs +++ b/swap/src/protocol/bob/event_loop.rs @@ -3,14 +3,13 @@ use crate::network::quote::BidQuote; use crate::network::{spot_price, transfer_proof}; use crate::protocol::bob::{Behaviour, OutEvent, State0, State2}; use crate::{bitcoin, monero}; -use anyhow::{anyhow, bail, Context, Result}; +use anyhow::{anyhow, Result}; use futures::FutureExt; use libp2p::swarm::SwarmEvent; use libp2p::{PeerId, Swarm}; -use std::convert::Infallible; use std::sync::Arc; use tokio::sync::mpsc::{Receiver, Sender}; -use tracing::{debug, error, trace}; +use tracing::{debug, error}; #[derive(Debug)] pub struct Channels { @@ -36,8 +35,6 @@ pub struct EventLoopHandle { start_execution_setup: Sender, done_execution_setup: Receiver>, recv_transfer_proof: Receiver, - conn_established: Receiver, - dial_alice: Sender<()>, send_encrypted_signature: Sender, request_spot_price: Sender, recv_spot_price: Receiver, @@ -62,19 +59,6 @@ impl EventLoopHandle { .ok_or_else(|| anyhow!("Failed to receive transfer proof from Alice")) } - /// Dials other party and wait for the connection to be established. - /// Do nothing if we are already connected - pub async fn dial(&mut self) -> Result<()> { - let _ = self.dial_alice.send(()).await?; - - self.conn_established - .recv() - .await - .ok_or_else(|| anyhow!("Failed to receive connection established from Alice"))?; - - Ok(()) - } - pub async fn request_spot_price(&mut self, btc: bitcoin::Amount) -> Result { let _ = self .request_spot_price @@ -122,7 +106,6 @@ pub struct EventLoop { start_execution_setup: Receiver, done_execution_setup: Sender>, recv_transfer_proof: Sender, - dial_alice: Receiver<()>, conn_established: Sender, send_encrypted_signature: Receiver, request_quote: Receiver<()>, @@ -138,7 +121,6 @@ impl EventLoop { let start_execution_setup = Channels::new(); let done_execution_setup = Channels::new(); let recv_transfer_proof = Channels::new(); - let dial_alice = Channels::new(); let conn_established = Channels::new(); let send_encrypted_signature = Channels::new(); let request_spot_price = Channels::new(); @@ -154,7 +136,6 @@ impl EventLoop { done_execution_setup: done_execution_setup.sender, recv_transfer_proof: recv_transfer_proof.sender, conn_established: conn_established.sender, - dial_alice: dial_alice.receiver, send_encrypted_signature: send_encrypted_signature.receiver, request_spot_price: request_spot_price.receiver, recv_spot_price: recv_spot_price.sender, @@ -166,8 +147,6 @@ impl EventLoop { start_execution_setup: start_execution_setup.sender, done_execution_setup: done_execution_setup.receiver, recv_transfer_proof: recv_transfer_proof.receiver, - conn_established: conn_established.receiver, - dial_alice: dial_alice.sender, send_encrypted_signature: send_encrypted_signature.sender, request_spot_price: request_spot_price.sender, recv_spot_price: recv_spot_price.receiver, @@ -178,7 +157,9 @@ impl EventLoop { Ok((event_loop, handle)) } - pub async fn run(mut self) -> Result { + pub async fn run(mut self) { + let _ = Swarm::dial(&mut self.swarm, &self.alice_peer_id); + loop { tokio::select! { swarm_event = self.swarm.next_event().fuse() => { @@ -188,10 +169,10 @@ impl EventLoop { } SwarmEvent::Behaviour(OutEvent::SpotPriceReceived(msg)) => { let _ = self.recv_spot_price.send(msg).await; - }, + } SwarmEvent::Behaviour(OutEvent::QuoteReceived(msg)) => { let _ = self.recv_quote.send(msg).await; - }, + } SwarmEvent::Behaviour(OutEvent::ExecutionSetupDone(res)) => { let _ = self.done_execution_setup.send(res.map(|state|*state)).await; } @@ -208,25 +189,42 @@ impl EventLoop { SwarmEvent::Behaviour(OutEvent::ResponseSent) => { } - SwarmEvent::Behaviour(OutEvent::CommunicationError(err)) => { - bail!(err.context("Communication error")) + SwarmEvent::Behaviour(OutEvent::CommunicationError(error)) => { + tracing::warn!("Communication error: {:#}", error); + return; + } + SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } if peer_id == self.alice_peer_id => { + tracing::debug!("Connected to Alice at {}", endpoint.get_remote_address()); + } + SwarmEvent::Dialing(peer_id) if peer_id == self.alice_peer_id => { + tracing::debug!("Dialling Alice at {}", peer_id); + } + SwarmEvent::ConnectionClosed { peer_id, endpoint, num_established, cause } if peer_id == self.alice_peer_id && num_established == 0 => { + match cause { + Some(error) => { + tracing::warn!("Lost connection to Alice at {}, cause: {}", endpoint.get_remote_address(), error); + }, + None => { + // no error means the disconnection was requested + tracing::info!("Successfully closed connection to Alice"); + return; + } + } + match libp2p::Swarm::dial(&mut self.swarm, &self.alice_peer_id) { + Ok(()) => {}, + Err(e) => { + tracing::warn!("Failed to re-dial Alice: {}", e); + return; + } + } + } + SwarmEvent::UnreachableAddr { peer_id, address, attempts_remaining, error } if peer_id == self.alice_peer_id && attempts_remaining == 0 => { + tracing::warn!("Failed to dial Alice at {}: {}", address, error); } _ => {} } }, - option = self.dial_alice.recv().fuse() => { - if option.is_some() { - let peer_id = self.alice_peer_id; - if self.swarm.pt.is_connected(&peer_id) { - trace!("Already connected to Alice at {}", peer_id); - let _ = self.conn_established.send(peer_id).await; - } else { - debug!("Dialing alice at {}", peer_id); - libp2p::Swarm::dial(&mut self.swarm, &peer_id).context("Failed to dial alice")?; - } - } - }, - spot_price_request = self.request_spot_price.recv().fuse() => { + spot_price_request = self.request_spot_price.recv().fuse() => { if let Some(request) = spot_price_request { self.swarm.request_spot_price(self.alice_peer_id, request); } diff --git a/swap/src/protocol/bob/swap.rs b/swap/src/protocol/bob/swap.rs index f13e5ffc..cb264750 100644 --- a/swap/src/protocol/bob/swap.rs +++ b/swap/src/protocol/bob/swap.rs @@ -69,8 +69,6 @@ async fn run_until_internal( BobState::Started { btc_amount } => { let bitcoin_refund_address = bitcoin_wallet.new_address().await?; - event_loop_handle.dial().await?; - let state2 = request_price_and_setup( btc_amount, &mut event_loop_handle, @@ -82,8 +80,6 @@ async fn run_until_internal( BobState::ExecutionSetupDone(state2) } BobState::ExecutionSetupDone(state2) => { - // Do not lock Bitcoin if not connected to Alice. - event_loop_handle.dial().await?; // Alice and Bob have exchanged info let (state3, tx_lock) = state2.lock_btc().await?; let signed_tx = bitcoin_wallet @@ -98,8 +94,6 @@ async fn run_until_internal( // Watch for Alice to Lock Xmr or for cancel timelock to elapse BobState::BtcLocked(state3) => { if let ExpiredTimelocks::None = state3.current_epoch(bitcoin_wallet.as_ref()).await? { - event_loop_handle.dial().await?; - let transfer_proof_watcher = event_loop_handle.recv_transfer_proof(); let cancel_timelock_expires = state3.wait_for_cancel_timelock_to_expire(bitcoin_wallet.as_ref()); @@ -140,8 +134,6 @@ async fn run_until_internal( monero_wallet_restore_blockheight, } => { if let ExpiredTimelocks::None = state.current_epoch(bitcoin_wallet.as_ref()).await? { - event_loop_handle.dial().await?; - let watch_request = state.lock_xmr_watch_request(lock_transfer_proof); select! { @@ -166,7 +158,6 @@ async fn run_until_internal( } BobState::XmrLocked(state) => { if let ExpiredTimelocks::None = state.expired_timelock(bitcoin_wallet.as_ref()).await? { - event_loop_handle.dial().await?; // Alice has locked Xmr // Bob sends Alice his key diff --git a/swap/tests/happy_path_restart_bob_before_comm.rs b/swap/tests/happy_path_restart_bob.rs similarity index 51% rename from swap/tests/happy_path_restart_bob_before_comm.rs rename to swap/tests/happy_path_restart_bob.rs index 6abba569..565eed60 100644 --- a/swap/tests/happy_path_restart_bob_before_comm.rs +++ b/swap/tests/happy_path_restart_bob.rs @@ -2,7 +2,7 @@ pub mod testutils; use swap::protocol::bob::BobState; use swap::protocol::{alice, bob}; -use testutils::bob_run_until::is_xmr_locked; +use testutils::bob_run_until::{is_btc_locked, is_xmr_locked}; use testutils::SlowCancelConfig; #[tokio::test] @@ -32,3 +32,31 @@ async fn given_bob_restarts_after_xmr_is_locked_resume_swap() { }) .await; } + +#[tokio::test] +async fn given_bob_restarts_before_xmr_is_locked_resume_swap() { + testutils::setup_test(SlowCancelConfig, |mut ctx| async move { + let (bob_swap, bob_join_handle) = ctx.bob_swap().await; + let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked)); + + let alice_swap = ctx.alice_next_swap().await; + let alice_swap = tokio::spawn(alice::run(alice_swap)); + + let bob_state = bob_swap.await??; + + assert!(matches!(bob_state, BobState::BtcLocked { .. })); + + let (bob_swap, _) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; + assert!(matches!(bob_swap.state, BobState::BtcLocked { .. })); + + let bob_state = bob::run(bob_swap).await?; + + ctx.assert_bob_redeemed(bob_state).await; + + let alice_state = alice_swap.await??; + ctx.assert_alice_redeemed(alice_state).await; + + Ok(()) + }) + .await; +} diff --git a/swap/tests/testutils/mod.rs b/swap/tests/testutils/mod.rs index 1235462e..fb442c2d 100644 --- a/swap/tests/testutils/mod.rs +++ b/swap/tests/testutils/mod.rs @@ -9,7 +9,6 @@ use get_port::get_port; use libp2p::core::Multiaddr; use libp2p::{PeerId, Swarm}; use monero_harness::{image, Monero}; -use std::convert::Infallible; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; @@ -78,7 +77,7 @@ impl BobParams { } } -pub struct BobEventLoopJoinHandle(JoinHandle>); +pub struct BobEventLoopJoinHandle(JoinHandle<()>); impl BobEventLoopJoinHandle { pub fn abort(&self) {