diff --git a/justfile b/justfile index fd8801d9..8c5dc41e 100644 --- a/justfile +++ b/justfile @@ -80,7 +80,7 @@ swap: # Run the asb on testnet asb-testnet: - ASB_DEV_ADDR_OUTPUT_PATH="$(pwd)/src-gui/.env.development" cargo run -p swap-asb --bin asb -- --testnet start --rpc-bind-port 9944 --rpc-bind-host 0.0.0.0 + ASB_DEV_ADDR_OUTPUT_PATH="$(pwd)/src-gui/.env.development" cargo run -p swap-asb --bin asb -- --testnet --trace start --rpc-bind-port 9944 --rpc-bind-host 0.0.0.0 # Launch the ASB controller REPL against a local testnet ASB instance asb-testnet-controller: @@ -140,3 +140,4 @@ code2prompt_single_crate crate: prepare-windows-build: cd dev-scripts && ./ubuntu_build_x86_86-w64-mingw32-gcc.sh + diff --git a/src-gui/src/renderer/components/pages/swap/swap/SwapWidget.tsx b/src-gui/src/renderer/components/pages/swap/swap/SwapWidget.tsx index 03741cb2..04118063 100644 --- a/src-gui/src/renderer/components/pages/swap/swap/SwapWidget.tsx +++ b/src-gui/src/renderer/components/pages/swap/swap/SwapWidget.tsx @@ -52,7 +52,16 @@ export default function SwapWidget() { flex: 1, }} > - + + + {swap.state !== null && ( <> diff --git a/swap-p2p/Cargo.toml b/swap-p2p/Cargo.toml index 54a0c095..2ef5b3cd 100644 --- a/swap-p2p/Cargo.toml +++ b/swap-p2p/Cargo.toml @@ -13,7 +13,7 @@ swap-machine = { path = "../swap-machine" } swap-serde = { path = "../swap-serde" } # Networking -libp2p = { workspace = true, features = ["serde", "request-response", "rendezvous", "cbor", "json", "ping", "identify"] } +libp2p = { workspace = true, features = ["serde", "request-response", "rendezvous", "cbor", "json", "identify", "ping"] } # Serialization asynchronous-codec = "0.7.0" diff --git a/swap-p2p/src/futures_util.rs b/swap-p2p/src/futures_util.rs new file mode 100644 index 00000000..ca222632 --- /dev/null +++ b/swap-p2p/src/futures_util.rs @@ -0,0 +1,71 @@ +use libp2p::futures::future::BoxFuture; +use libp2p::futures::stream::{FuturesUnordered, StreamExt}; +use std::collections::HashSet; +use std::hash::Hash; +use std::task::{Context, Poll}; + +/// A collection of futures with associated keys that can be checked for presence +/// before completion. +/// +/// This combines a HashSet for key tracking with FuturesUnordered for efficient polling. +/// The key is provided during insertion; the future only needs to yield the value. +pub struct FuturesHashSet { + keys: HashSet, + futures: FuturesUnordered>, +} + +impl FuturesHashSet { + pub fn new() -> Self { + Self { + keys: HashSet::new(), + futures: FuturesUnordered::new(), + } + } + + /// Check if a future with the given key is already pending + pub fn contains_key(&self, key: &K) -> bool { + self.keys.contains(key) + } + + /// Insert a new future with the given key. + /// The future should yield V; the key will be paired with it when it completes. + /// Returns true if the key was newly inserted, false if it was already present. + /// If false is returned, the future is not added. + pub fn insert(&mut self, key: K, future: BoxFuture<'static, V>) -> bool { + if self.keys.insert(key.clone()) { + let key_clone = key; + let wrapped = async move { + let value = future.await; + (key_clone, value) + }; + self.futures.push(Box::pin(wrapped)); + true + } else { + false + } + } + + /// Poll for the next completed future. + /// When a future completes, its key is automatically removed from the tracking set. + pub fn poll_next_unpin(&mut self, cx: &mut Context) -> Poll> { + match self.futures.poll_next_unpin(cx) { + Poll::Ready(Some((k, v))) => { + self.keys.remove(&k); + Poll::Ready(Some((k, v))) + } + other => other, + } + } + + pub fn len(&self) -> usize { + assert_eq!(self.keys.len(), self.futures.len()); + + self.keys.len() + } +} + +impl Default for FuturesHashSet { + fn default() -> Self { + Self::new() + } +} diff --git a/swap-p2p/src/lib.rs b/swap-p2p/src/lib.rs index 047f441f..39d54c66 100644 --- a/swap-p2p/src/lib.rs +++ b/swap-p2p/src/lib.rs @@ -1,3 +1,4 @@ +pub mod futures_util; pub mod impl_from_rr_event; pub mod out_event; pub mod protocols; diff --git a/swap-p2p/src/out_event/bob.rs b/swap-p2p/src/out_event/bob.rs index 1f308e76..69fab49d 100644 --- a/swap-p2p/src/out_event/bob.rs +++ b/swap-p2p/src/out_event/bob.rs @@ -6,6 +6,7 @@ use libp2p::{ PeerId, }; +use crate::protocols::redial; use crate::protocols::{ cooperative_xmr_redeem_after_punish::CooperativeXmrRedeemRejectReason, quote::BidQuote, transfer_proof, @@ -17,7 +18,11 @@ pub enum OutEvent { id: OutboundRequestId, response: BidQuote, }, - SwapSetupCompleted(Box>), + SwapSetupCompleted { + peer: PeerId, + swap_id: uuid::Uuid, + result: Box>, + }, TransferProofReceived { msg: Box, channel: ResponseChannel<()>, @@ -53,6 +58,7 @@ pub enum OutEvent { request_id: InboundRequestId, protocol: String, }, + Redial(redial::Event), /// "Fallback" variant that allows the event mapping code to swallow certain /// events that we don't want the caller to deal with. Other, diff --git a/swap-p2p/src/protocols.rs b/swap-p2p/src/protocols.rs index 7a3c8aec..9207dcdd 100644 --- a/swap-p2p/src/protocols.rs +++ b/swap-p2p/src/protocols.rs @@ -1,6 +1,7 @@ pub mod cooperative_xmr_redeem_after_punish; pub mod encrypted_signature; pub mod quote; +pub mod redial; pub mod rendezvous; pub mod swap_setup; pub mod transfer_proof; diff --git a/swap-p2p/src/protocols/redial.rs b/swap-p2p/src/protocols/redial.rs new file mode 100644 index 00000000..e54fe446 --- /dev/null +++ b/swap-p2p/src/protocols/redial.rs @@ -0,0 +1,205 @@ +use crate::futures_util::FuturesHashSet; +use crate::out_event; +use backoff::backoff::Backoff; +use backoff::ExponentialBackoff; +use libp2p::core::Multiaddr; +use libp2p::swarm::dial_opts::{DialOpts, PeerCondition}; +use libp2p::swarm::{NetworkBehaviour, ToSwarm}; +use libp2p::PeerId; +use std::collections::{HashMap, HashSet, VecDeque}; +use std::task::{Context, Poll}; +use std::time::Duration; +use void::Void; + +/// A [`NetworkBehaviour`] that tracks whether we are connected to the given +/// peers and attempts to re-establish a connection with an exponential backoff +/// if we lose the connection. +pub struct Behaviour { + /// The peers we are interested in. + peers: HashSet, + /// Tracks sleep timers for each peer waiting to redial. + /// Futures in here yield the PeerId and when a Future completes we dial that peer + sleep: FuturesHashSet, + /// Tracks the current backoff state for each peer. + backoff: HashMap, + /// Initial interval for backoff. + initial_interval: Duration, + /// Maximum interval for backoff. + max_interval: Duration, + /// A queue of events to be sent to the swarm. + to_swarm: VecDeque, +} + +impl Behaviour { + pub fn new(interval: Duration, max_interval: Duration) -> Self { + Self { + peers: HashSet::default(), + sleep: FuturesHashSet::new(), + backoff: HashMap::new(), + initial_interval: interval, + max_interval, + to_swarm: VecDeque::new(), + } + } + + /// Adds a peer to the set of peers to track. Returns true if the peer was newly added. + pub fn add_peer(&mut self, peer: PeerId) -> bool { + let newly_added = self.peers.insert(peer); + + // If the peer is newly added, schedule a dial immediately + if newly_added { + self.sleep.insert(peer, Box::pin(std::future::ready(()))); + } + + newly_added + } + + fn get_backoff(&mut self, peer: &PeerId) -> &mut ExponentialBackoff { + self.backoff.entry(*peer).or_insert_with(|| { + ExponentialBackoff { + initial_interval: self.initial_interval, + current_interval: self.initial_interval, + max_interval: self.max_interval, + // We never give up on re-dialling + max_elapsed_time: None, + ..ExponentialBackoff::default() + } + }) + } + + pub fn has_pending_redial(&self, peer: &PeerId) -> bool { + self.sleep.contains_key(peer) + } +} + +#[derive(Debug)] +pub enum Event { + ScheduledRedial { + peer: PeerId, + next_dial_in: Duration, + }, +} + +impl NetworkBehaviour for Behaviour { + type ConnectionHandler = libp2p::swarm::dummy::ConnectionHandler; + type ToSwarm = Event; + + fn handle_established_inbound_connection( + &mut self, + _connection_id: libp2p::swarm::ConnectionId, + peer: PeerId, + _local_addr: &Multiaddr, + _remote_addr: &Multiaddr, + ) -> Result, libp2p::swarm::ConnectionDenied> { + // TOOD: Uncomment this if we want to redial ALL peers we ever connected to + // Add the peer if it's not already tracked. + // self.add_peer(peer); + + // Reset the backoff state to start with the initial interval again once we disconnect again + if let Some(backoff) = self.backoff.get_mut(&peer) { + backoff.reset(); + } + + Ok(Self::ConnectionHandler {}) + } + + fn handle_established_outbound_connection( + &mut self, + _connection_id: libp2p::swarm::ConnectionId, + peer: PeerId, + _addr: &Multiaddr, + _role_override: libp2p::core::Endpoint, + ) -> Result, libp2p::swarm::ConnectionDenied> { + // TOOD: Uncomment this if we want to redial ALL peers we ever connected to + // Add the peer if it's not already tracked. + // self.add_peer(peer); + + // Reset the backoff state to start with the initial interval again once we disconnect again + if let Some(backoff) = self.backoff.get_mut(&peer) { + backoff.reset(); + } + + Ok(Self::ConnectionHandler {}) + } + + fn on_swarm_event(&mut self, event: libp2p::swarm::FromSwarm<'_>) { + let peer_to_redial = match event { + libp2p::swarm::FromSwarm::ConnectionClosed(e) if self.peers.contains(&e.peer_id) => { + Some(e.peer_id) + } + libp2p::swarm::FromSwarm::DialFailure(e) => match e.peer_id { + Some(peer_id) if self.peers.contains(&peer_id) => Some(peer_id), + _ => None, + }, + _ => None, + }; + + if let Some(peer) = peer_to_redial { + let backoff = self.get_backoff(&peer); + + let next_dial_in = backoff + .next_backoff() + .expect("redial backoff should never run out of attempts"); + + if self.sleep.insert( + peer, + Box::pin(async move { + tokio::time::sleep(next_dial_in).await; + }), + ) { + self.to_swarm + .push_back(Event::ScheduledRedial { peer, next_dial_in }); + + tracing::info!( + peer_id = %peer, + seconds_until_next_redial = %next_dial_in.as_secs(), + "Waiting for next redial attempt" + ); + } + } + } + + fn poll(&mut self, cx: &mut Context<'_>) -> std::task::Poll> { + // Check if we have any event to send to the swarm + if let Some(event) = self.to_swarm.pop_front() { + return Poll::Ready(ToSwarm::GenerateEvent(event)); + } + + // Check if any peer's sleep timer has completed + // If it has, dial that peer + match self.sleep.poll_next_unpin(cx) { + Poll::Ready(Some((peer, _))) => { + // Actually dial the peer + Poll::Ready(ToSwarm::Dial { + opts: DialOpts::peer_id(peer) + // TODO: Maybe use DisconnectedAndNotDialing here? + .condition(PeerCondition::Disconnected) + .build(), + }) + } + Poll::Ready(None) | Poll::Pending => Poll::Pending, + } + } + + fn on_connection_handler_event( + &mut self, + _peer_id: PeerId, + _connection_id: libp2p::swarm::ConnectionId, + _event: libp2p::swarm::THandlerOutEvent, + ) { + unreachable!("The re-dial dummy connection handler does not produce any events"); + } +} + +impl From for out_event::bob::OutEvent { + fn from(event: Event) -> Self { + out_event::bob::OutEvent::Redial(event) + } +} + +impl From for out_event::alice::OutEvent { + fn from(_event: Event) -> Self { + // TODO: Once this is used by Alice, convert this to a proper event + out_event::alice::OutEvent::Other + } +} diff --git a/swap-p2p/src/protocols/swap_setup/bob.rs b/swap-p2p/src/protocols/swap_setup/bob.rs index c98ffb01..279eb0e7 100644 --- a/swap-p2p/src/protocols/swap_setup/bob.rs +++ b/swap-p2p/src/protocols/swap_setup/bob.rs @@ -8,12 +8,15 @@ use futures::future::{BoxFuture, OptionFuture}; use futures::AsyncWriteExt; use futures::FutureExt; use libp2p::core::upgrade; +use libp2p::swarm::behaviour::ConnectionEstablished; +use libp2p::swarm::dial_opts::{DialOpts, PeerCondition}; use libp2p::swarm::{ - ConnectionDenied, ConnectionHandler, ConnectionHandlerEvent, ConnectionId, FromSwarm, - NetworkBehaviour, SubstreamProtocol, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, + ConnectionClosed, ConnectionDenied, ConnectionHandler, ConnectionHandlerEvent, ConnectionId, + FromSwarm, NetworkBehaviour, SubstreamProtocol, THandler, THandlerInEvent, THandlerOutEvent, + ToSwarm, }; use libp2p::{Multiaddr, PeerId}; -use std::collections::VecDeque; +use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::Arc; use std::task::Poll; use std::time::Duration; @@ -29,8 +32,29 @@ use super::{read_cbor_message, write_cbor_message, SpotPriceRequest}; pub struct Behaviour { env_config: env::Config, bitcoin_wallet: Arc, - new_swaps: VecDeque<(PeerId, NewSwap)>, - completed_swaps: VecDeque<(PeerId, Completed)>, + + // Queue of swap setup request that haven't been assigned to a connection handler yet + // (peer_id, swap_id, new_swap) + new_swaps: VecDeque<(PeerId, Uuid, NewSwap)>, + + // Maintains the list of connections handlers for a specific peer + // + // 0. List of connection handlers that are still active but haven't been assigned a swap setup request yet + // 1. List of connection handlers that have died. Once their death is acknowledged / processed, they are removed from the list + connection_handlers: HashMap, VecDeque)>, + + // Queue of completed swaps that we have assigned a connection handler to but where we haven't notified the ConnectionHandler yet + // We notify the ConnectionHandler by emitting a ConnectionHandlerEvent::NotifyBehaviour event + assigned_unnotified_swaps: VecDeque<(ConnectionId, PeerId, Uuid, NewSwap)>, + + // Maintains the list of requests that we have sent to a connection handler but haven't yet received a response + inflight_requests: HashMap, + + // Queue of swap setup results that we want to notify the Swarm about + to_swarm: VecDeque, + + // Queue of peers that we want to instruct the Swarm to dial + to_dial: VecDeque, } impl Behaviour { @@ -39,24 +63,53 @@ impl Behaviour { env_config, bitcoin_wallet, new_swaps: VecDeque::default(), - completed_swaps: VecDeque::default(), + to_swarm: VecDeque::default(), + assigned_unnotified_swaps: VecDeque::default(), + inflight_requests: HashMap::default(), + connection_handlers: HashMap::default(), + to_dial: VecDeque::default(), } } - pub async fn start(&mut self, alice: PeerId, swap: NewSwap) { - self.new_swaps.push_back((alice, swap)) - } -} + pub async fn start(&mut self, alice_peer_id: PeerId, swap: NewSwap) { + tracing::trace!( + %alice_peer_id, + ?swap, + "Queuing new swap setup request inside the Behaviour", + ); -impl From for out_event::bob::OutEvent { - fn from(completed: Completed) -> Self { - out_event::bob::OutEvent::SwapSetupCompleted(Box::new(completed.0)) + // TODO: This is a bit redundant because we already have the swap_id in the NewSwap struct + self.new_swaps + .push_back((alice_peer_id, swap.swap_id, swap)); + self.to_dial.push_back(alice_peer_id); + } + + // Returns a mutable reference to the queues of the connection handlers for a specific peer + fn connection_handlers_mut( + &mut self, + peer_id: PeerId, + ) -> &mut (VecDeque, VecDeque) { + self.connection_handlers.entry(peer_id).or_default() + } + + // Returns a mutable reference to the queues of the connection handlers for a specific peer + fn alive_connection_handlers_mut(&mut self, peer_id: PeerId) -> &mut VecDeque { + &mut self.connection_handlers_mut(peer_id).0 + } + + // Returns a mutable reference to the queues of the connection handlers for a specific peer + fn dead_connection_handlers_mut(&mut self, peer_id: PeerId) -> &mut VecDeque { + &mut self.connection_handlers_mut(peer_id).1 + } + + fn known_peers(&self) -> HashSet { + self.connection_handlers.keys().copied().collect() } } impl NetworkBehaviour for Behaviour { type ConnectionHandler = Handler; - type ToSwarm = Completed; + type ToSwarm = SwapSetupResult; fn handle_established_inbound_connection( &mut self, @@ -78,17 +131,57 @@ impl NetworkBehaviour for Behaviour { Ok(Handler::new(self.env_config, self.bitcoin_wallet.clone())) } - fn on_swarm_event(&mut self, _event: FromSwarm<'_>) { - // We do not need to handle swarm events + fn on_swarm_event(&mut self, event: FromSwarm<'_>) { + match event { + FromSwarm::ConnectionEstablished(ConnectionEstablished { + peer_id, + connection_id, + endpoint, + .. + }) => { + tracing::trace!( + peer = %peer_id, + connection_id = %connection_id, + endpoint = ?endpoint, + "A new connection handler has been established", + ); + + self.alive_connection_handlers_mut(peer_id) + .push_back(connection_id); + } + FromSwarm::ConnectionClosed(ConnectionClosed { + peer_id, + connection_id, + .. + }) => { + tracing::trace!( + peer = %peer_id, + connection_id = %connection_id, + "A swap setup connection handler has died", + ); + + self.dead_connection_handlers_mut(peer_id) + .push_back(connection_id); + } + _ => {} + } } fn on_connection_handler_event( &mut self, - peer_id: PeerId, - _connection_id: libp2p::swarm::ConnectionId, - event: THandlerOutEvent, + event_peer_id: PeerId, + connection_id: libp2p::swarm::ConnectionId, + result: THandlerOutEvent, ) { - self.completed_swaps.push_back((peer_id, event)); + if let Some((swap_id, peer)) = self.inflight_requests.remove(&connection_id) { + assert_eq!(peer, event_peer_id); + + self.to_swarm.push_back(SwapSetupResult { + peer, + swap_id, + result, + }); + } } fn poll( @@ -96,19 +189,127 @@ impl NetworkBehaviour for Behaviour { _cx: &mut std::task::Context<'_>, ) -> Poll>> { // Forward completed swaps from the connection handler to the swarm - if let Some((_peer, completed)) = self.completed_swaps.pop_front() { + if let Some(completed) = self.to_swarm.pop_front() { + tracing::trace!( + peer = %completed.peer, + "Forwarding completed swap setup from Behaviour to the Swarm", + ); + return Poll::Ready(ToSwarm::GenerateEvent(completed)); } - // If there is a new swap to be started, send it to the connection handler - if let Some((peer, event)) = self.new_swaps.pop_front() { - return Poll::Ready(ToSwarm::NotifyHandler { - peer_id: peer, - handler: libp2p::swarm::NotifyHandler::Any, - event, + // Forward any peers that we want to dial to the Swarm + if let Some(peer) = self.to_dial.pop_front() { + tracing::trace!( + peer = %peer, + "Instructing swarm to dial a new connection handler for a swap setup request", + ); + + return Poll::Ready(ToSwarm::Dial { + opts: DialOpts::peer_id(peer) + .condition(PeerCondition::DisconnectedAndNotDialing) + .build(), }); } + // Remove any unused already dead connection handlers that were never assigned a request + for peer in self.known_peers() { + let (alive_connection_handlers, dead_connection_handlers) = + self.connection_handlers_mut(peer); + + // Create sets for efficient lookup + let alive_set: HashSet<_> = alive_connection_handlers.iter().copied().collect(); + let dead_set: HashSet<_> = dead_connection_handlers.iter().copied().collect(); + + // Remove from alive any handlers that are also in dead + alive_connection_handlers.retain(|id| !dead_set.contains(id)); + + // Remove from dead any handlers that were in alive (the overlap we just processed) + dead_connection_handlers.retain(|id| !alive_set.contains(id)); + } + + // Go through our new_swaps and try to assign a request to a connection handler + // + // If we find a connection handler for the peer, it will be removed from new_swaps + // If we don't find a connection handler for the peer, it will remain in new_swaps + { + let new_swaps = &mut self.new_swaps; + let connection_handlers = &mut self.connection_handlers; + let assigned_unnotified_swaps = &mut self.assigned_unnotified_swaps; + + let mut remaining = std::collections::VecDeque::new(); + for (peer, swap_id, new_swap) in new_swaps.drain(..) { + if let Some(connection_id) = + connection_handlers.entry(peer).or_default().0.pop_front() + { + assigned_unnotified_swaps.push_back((connection_id, peer, swap_id, new_swap)); + } else { + remaining.push_back((peer, swap_id, new_swap)); + } + } + + *new_swaps = remaining; + } + + // If a connection handler died which had an assigned swap setup request, + // we need to notify the swarm that the request failed + for peer_id in self.known_peers() { + while let Some(connection_id) = self.dead_connection_handlers_mut(peer_id).pop_front() { + if let Some((swap_id, _)) = self.inflight_requests.remove(&connection_id) { + self.to_swarm.push_back(SwapSetupResult { + peer: peer_id, + swap_id, + result: Err(anyhow::anyhow!("Connection handler for peer {} has died after we notified it of the swap setup request", peer_id)), + }); + } + } + } + + // Iterate through our assigned_unnotified_swaps queue (with popping) + if let Some((connection_id, peer_id, swap_id, new_swap)) = + self.assigned_unnotified_swaps.pop_front() + { + tracing::trace!( + swap_id = %swap_id, + connection_id = %connection_id, + ?new_swap, + "Dispatching swap setup request from Behaviour to a specific connection handler", + ); + + // Check if the connection handler is still alive + if let Some(dead_connection_handler) = self + .dead_connection_handlers_mut(peer_id) + .iter() + .position(|id| *id == connection_id) + { + self.dead_connection_handlers_mut(peer_id) + .remove(dead_connection_handler); + + self.to_swarm.push_back(SwapSetupResult { + peer: peer_id, + swap_id, + result: Err(anyhow::anyhow!("Connection handler for peer {} has died before we could notify it of the swap setup request", peer_id)), + }); + } else { + // ConnectionHandler must still be alive, notify it of the swap setup request + tracing::trace!( + peer = %peer_id, + swap_id = %swap_id, + ?new_swap, + "Notifying connection handler of the swap setup request. We are assuming it is still alive.", + ); + + self.inflight_requests + .insert(connection_id, (swap_id, peer_id)); + + return Poll::Ready(ToSwarm::NotifyHandler { + peer_id, + handler: libp2p::swarm::NotifyHandler::One(connection_id), + event: new_swap, + }); + } + } + Poll::Pending } } @@ -132,6 +333,8 @@ impl Handler { timeout: Duration::from_secs(120), new_swaps: VecDeque::default(), bitcoin_wallet, + // TODO: This will keep ALL connections alive indefinitely + // which is not optimal keep_alive: true, } } @@ -148,11 +351,15 @@ pub struct NewSwap { } #[derive(Debug)] -pub struct Completed(Result); +pub struct SwapSetupResult { + peer: PeerId, + swap_id: Uuid, + result: Result, +} impl ConnectionHandler for Handler { type FromBehaviour = NewSwap; - type ToBehaviour = Completed; + type ToBehaviour = Result; type InboundProtocol = upgrade::DeniedUpgrade; type OutboundProtocol = protocol::SwapSetup; type InboundOpenInfo = (); @@ -175,7 +382,7 @@ impl ConnectionHandler for Handler { ) { match event { libp2p::swarm::handler::ConnectionEvent::FullyNegotiatedInbound(_) => { - unreachable!("Bob does not support inbound substreams") + // TODO: Maybe warn here as Bob does not support inbound substreams? } libp2p::swarm::handler::ConnectionEvent::FullyNegotiatedOutbound(outbound) => { let mut substream = outbound.protocol; @@ -185,85 +392,17 @@ impl ConnectionHandler for Handler { let env_config = self.env_config; let protocol = tokio::time::timeout(self.timeout, async move { - let result = async { - // Here we request the spot price from Alice - write_cbor_message( - &mut substream, - SpotPriceRequest { - btc: new_swap_request.btc, - blockchain_network: BlockchainNetwork { - bitcoin: env_config.bitcoin_network, - monero: env_config.monero_network, - }, - }, - ) - .await - .context("Failed to send spot price request to Alice")?; - - // Here we read the spot price response from Alice - // The outer ? checks if Alice responded with an error (SpotPriceError) - let xmr = Result::from( - // The inner ? is for the read_cbor_message function - // It will return an error if the deserialization fails - read_cbor_message::(&mut substream) - .await - .context("Failed to read spot price response from Alice")?, - )?; - - let state0 = State0::new( - new_swap_request.swap_id, - &mut rand::thread_rng(), - new_swap_request.btc, - xmr, - env_config.bitcoin_cancel_timelock.into(), - env_config.bitcoin_punish_timelock.into(), - new_swap_request.bitcoin_refund_address.clone(), - env_config.monero_finality_confirmations, - new_swap_request.tx_refund_fee, - new_swap_request.tx_cancel_fee, - new_swap_request.tx_lock_fee, - ); - - write_cbor_message(&mut substream, state0.next_message()) - .await - .context("Failed to send state0 message to Alice")?; - let message1 = read_cbor_message::(&mut substream) - .await - .context("Failed to read message1 from Alice")?; - let state1 = state0 - .receive(bitcoin_wallet.as_ref(), message1) - .await - .context("Failed to receive state1")?; - write_cbor_message(&mut substream, state1.next_message()) - .await - .context("Failed to send state1 message")?; - let message3 = read_cbor_message::(&mut substream) - .await - .context("Failed to read message3 from Alice")?; - let state2 = state1 - .receive(message3) - .context("Failed to receive state2")?; - - write_cbor_message(&mut substream, state2.next_message()) - .await - .context("Failed to send state2 message")?; - - substream - .flush() - .await - .context("Failed to flush substream")?; - substream - .close() - .await - .context("Failed to close substream")?; - - Ok(state2) - } + let result = run_swap_setup( + &mut substream, + new_swap_request, + env_config, + bitcoin_wallet, + ) .await; - result.map_err(|e: anyhow::Error| { - tracing::error!("Error occurred during swap setup protocol: {:#}", e); - Error::Other + result.map_err(|err: anyhow::Error| { + tracing::error!(?err, "Error occurred during swap setup protocol"); + Error::Protocol(format!("{:?}", err)) }) }); @@ -275,11 +414,25 @@ impl ConnectionHandler for Handler { })? }) as OutboundStream)); - - // Once the outbound stream is created, we keep the connection alive - self.keep_alive = true; } - _ => {} + libp2p::swarm::handler::ConnectionEvent::AddressChange(address_change) => { + tracing::trace!( + ?address_change, + "Connection address changed during swap setup" + ); + } + libp2p::swarm::handler::ConnectionEvent::DialUpgradeError(dial_upgrade_error) => { + tracing::trace!(error = %dial_upgrade_error.error, "Dial upgrade error during swap setup"); + } + libp2p::swarm::handler::ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => { + tracing::trace!( + ?listen_upgrade_error, + "Listen upgrade error during swap setup" + ); + } + _ => { + // We ignore the rest of events + } } } @@ -297,8 +450,15 @@ impl ConnectionHandler for Handler { ) -> Poll< ConnectionHandlerEvent, > { - // Check if there is a new swap to be started + // Check if there is a new swap to be started on this connection + // Has the Behaviour assigned us a new swap to be started on this connection? if let Some(new_swap) = self.new_swaps.pop_front() { + tracing::trace!( + ?new_swap.swap_id, + "Instructing swarm to start a new outbound substream as part of swap setup", + ); + + // Keep the connection alive because we want to use it self.keep_alive = true; // We instruct the swarm to start a new outbound substream @@ -315,15 +475,123 @@ impl ConnectionHandler for Handler { self.keep_alive = false; // We notify the swarm that the swap setup is completed / failed - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Completed( - result.map_err(anyhow::Error::from), - ))); + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + result.map_err(anyhow::Error::from).into(), + )); } Poll::Pending } } +async fn run_swap_setup( + mut substream: &mut libp2p::swarm::Stream, + new_swap_request: NewSwap, + env_config: env::Config, + bitcoin_wallet: Arc, +) -> Result { + // Here we request the spot price from Alice + write_cbor_message( + &mut substream, + SpotPriceRequest { + btc: new_swap_request.btc, + blockchain_network: BlockchainNetwork { + bitcoin: env_config.bitcoin_network, + monero: env_config.monero_network, + }, + }, + ) + .await + .context("Failed to send spot price request to Alice")?; + + // Here we read the spot price response from Alice + // The outer ? checks if Alice responded with an error (SpotPriceError) + let xmr = Result::from( + // The inner ? is for the read_cbor_message function + // It will return an error if the deserialization fails + read_cbor_message::(&mut substream) + .await + .context("Failed to read spot price response from Alice")?, + )?; + + tracing::trace!( + %new_swap_request.swap_id, + xmr = %xmr, + btc = %new_swap_request.btc, + "Got spot price response from Alice as part of swap setup", + ); + + let state0 = State0::new( + new_swap_request.swap_id, + &mut rand::thread_rng(), + new_swap_request.btc, + xmr, + env_config.bitcoin_cancel_timelock.into(), + env_config.bitcoin_punish_timelock.into(), + new_swap_request.bitcoin_refund_address.clone(), + env_config.monero_finality_confirmations, + new_swap_request.tx_refund_fee, + new_swap_request.tx_cancel_fee, + new_swap_request.tx_lock_fee, + ); + + tracing::trace!( + %new_swap_request.swap_id, + "Transitioned into state0 during swap setup", + ); + + write_cbor_message(&mut substream, state0.next_message()) + .await + .context("Failed to send state0 message to Alice")?; + let message1 = read_cbor_message::(&mut substream) + .await + .context("Failed to read message1 from Alice")?; + let state1 = state0 + .receive(bitcoin_wallet.as_ref(), message1) + .await + .context("Failed to receive state1")?; + + tracing::trace!( + %new_swap_request.swap_id, + "Transitioned into state1 during swap setup", + ); + + write_cbor_message(&mut substream, state1.next_message()) + .await + .context("Failed to send state1 message")?; + let message3 = read_cbor_message::(&mut substream) + .await + .context("Failed to read message3 from Alice")?; + let state2 = state1 + .receive(message3) + .context("Failed to receive state2")?; + + tracing::trace!( + %new_swap_request.swap_id, + "Transitioned into state2 during swap setup", + ); + + write_cbor_message(&mut substream, state2.next_message()) + .await + .context("Failed to send state2 message")?; + + substream + .flush() + .await + .context("Failed to flush substream")?; + substream + .close() + .await + .context("Failed to close substream")?; + + tracing::trace!( + %new_swap_request.swap_id, + "Swap setup completed", + ); + + Ok(state2) +} + impl From for Result { fn from(response: SpotPriceResponse) -> Self { match response { @@ -359,6 +627,11 @@ pub enum Error { #[error("Failed to complete swap setup within {seconds}s")] Timeout { seconds: u64 }, + /// Something went wrong during the swap setup protocol that is not covered by the other errors + /// but where we have some context about the error + #[error("Something went wrong during the swap setup protocol: {0}")] + Protocol(String), + /// To be used for errors that cannot be explained on the CLI side (e.g. /// rate update problems on the seller side) #[error("Seller encountered a problem, please try again later.")] @@ -383,3 +656,13 @@ impl From for Error { } } } + +impl From for out_event::bob::OutEvent { + fn from(completed: SwapSetupResult) -> Self { + out_event::bob::OutEvent::SwapSetupCompleted { + result: Box::new(completed.result), + swap_id: completed.swap_id, + peer: completed.peer, + } + } +} diff --git a/swap/src/cli.rs b/swap/src/cli.rs index 77be9aff..64b8b096 100644 --- a/swap/src/cli.rs +++ b/swap/src/cli.rs @@ -9,7 +9,7 @@ pub mod watcher; pub use behaviour::{Behaviour, OutEvent}; pub use cancel_and_refund::{cancel, cancel_and_refund, refund}; -pub use event_loop::{EventLoop, EventLoopHandle}; +pub use event_loop::{EventLoop, EventLoopHandle, SwapEventLoopHandle}; pub use list_sellers::{list_sellers, SellerStatus}; #[cfg(test)] diff --git a/swap/src/cli/api/request.rs b/swap/src/cli/api/request.rs index b7af2645..f3240f2a 100644 --- a/swap/src/cli/api/request.rs +++ b/swap/src/cli/api/request.rs @@ -1106,7 +1106,6 @@ pub async fn buy_xmr( .await?; let behaviour = cli::Behaviour::new( - seller_peer_id, env_config, bitcoin_wallet.clone(), (seed.derive_libp2p_identity(), namespace), @@ -1131,9 +1130,7 @@ pub async fn buy_xmr( TauriSwapProgressEvent::ReceivedQuote(quote.clone()), ); - // Now create the event loop we use for the swap - let (event_loop, event_loop_handle) = - EventLoop::new(swap_id, swarm, seller_peer_id, db.clone())?; + let (event_loop, mut event_loop_handle) = EventLoop::new(swarm, db.clone())?; let event_loop = tokio::spawn(event_loop.run().in_current_span()); tauri_handle.emit_swap_progress_event(swap_id, TauriSwapProgressEvent::ReceivedQuote(quote)); @@ -1161,13 +1158,14 @@ pub async fn buy_xmr( } }, swap_result = async { + let swap_event_loop_handle = event_loop_handle.swap_handle(seller_peer_id, swap_id).await?; let swap = Swap::new( db.clone(), swap_id, bitcoin_wallet.clone(), monero_wallet, env_config, - event_loop_handle, + swap_event_loop_handle, monero_receive_pool.clone(), bitcoin_change_address_for_spawn, tx_lock_amount, @@ -1225,7 +1223,6 @@ pub async fn resume_swap( .derive_libp2p_identity(); let behaviour = cli::Behaviour::new( - seller_peer_id, config.env_config, bitcoin_wallet.clone(), (seed.clone(), config.namespace), @@ -1240,20 +1237,22 @@ pub async fn resume_swap( swarm.add_peer_address(seller_peer_id, seller_address); } - let (event_loop, event_loop_handle) = - EventLoop::new(swap_id, swarm, seller_peer_id, db.clone())?; + let (event_loop, mut event_loop_handle) = EventLoop::new(swarm, db.clone())?; let monero_receive_pool = db.get_monero_address_pool(swap_id).await?; let tauri_handle = context.tauri_handle.clone(); + let swap_event_loop_handle = event_loop_handle + .swap_handle(seller_peer_id, swap_id) + .await?; let swap = Swap::from_db( db.clone(), swap_id, bitcoin_wallet, monero_manager, config.env_config, - event_loop_handle, + swap_event_loop_handle, monero_receive_pool, ) .await? diff --git a/swap/src/cli/behaviour.rs b/swap/src/cli/behaviour.rs index c745a092..d2fbeba3 100644 --- a/swap/src/cli/behaviour.rs +++ b/swap/src/cli/behaviour.rs @@ -6,7 +6,7 @@ use crate::network::{ use anyhow::Result; use bitcoin_wallet::BitcoinWallet; use libp2p::swarm::NetworkBehaviour; -use libp2p::{identify, identity, ping, PeerId}; +use libp2p::{identify, identity, ping}; use std::sync::Arc; use std::time::Duration; use swap_env::env; @@ -38,7 +38,6 @@ pub struct Behaviour { impl Behaviour { pub fn new( - alice: PeerId, env_config: env::Config, bitcoin_wallet: Arc, identify_params: (identity::Keypair, XmrBtcNamespace), @@ -57,7 +56,7 @@ impl Behaviour { transfer_proof: transfer_proof::bob(), encrypted_signature: encrypted_signature::bob(), cooperative_xmr_redeem: cooperative_xmr_redeem_after_punish::bob(), - redial: redial::Behaviour::new(alice, INITIAL_REDIAL_INTERVAL, MAX_REDIAL_INTERVAL), + redial: redial::Behaviour::new(INITIAL_REDIAL_INTERVAL, MAX_REDIAL_INTERVAL), ping: ping::Behaviour::new(pingConfig), identify: identify::Behaviour::new(identifyConfig), } diff --git a/swap/src/cli/event_loop.rs b/swap/src/cli/event_loop.rs index 8c61941f..79adebc4 100644 --- a/swap/src/cli/event_loop.rs +++ b/swap/src/cli/event_loop.rs @@ -7,17 +7,18 @@ use crate::network::swap_setup::bob::NewSwap; use crate::protocol::bob::swap::has_already_processed_transfer_proof; use crate::protocol::bob::{BobState, State2}; use crate::protocol::Database; -use anyhow::{anyhow, Context, Result}; -use futures::future::{BoxFuture, OptionFuture}; +use anyhow::{anyhow, bail, Context, Result}; +use futures::future::BoxFuture; +use futures::stream::FuturesUnordered; use futures::{FutureExt, StreamExt}; use libp2p::request_response::{OutboundFailure, OutboundRequestId, ResponseChannel}; -use libp2p::swarm::dial_opts::DialOpts; use libp2p::swarm::SwarmEvent; use libp2p::{PeerId, Swarm}; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use swap_core::bitcoin::EncryptedSignature; +use swap_p2p::protocols::redial; use uuid::Uuid; static REQUEST_RESPONSE_PROTOCOL_TIMEOUT: Duration = Duration::from_secs(60); @@ -25,199 +26,238 @@ static EXECUTION_SETUP_PROTOCOL_TIMEOUT: Duration = Duration::from_secs(120); #[allow(missing_debug_implementations)] pub struct EventLoop { - swap_id: Uuid, swarm: libp2p::Swarm, - alice_peer_id: PeerId, db: Arc, - // These streams represents outgoing requests that we have to make - // These are essentially queues of requests that we will send to Alice once we are connected to her. - quote_requests: bmrng::RequestReceiverStream<(), Result>, - cooperative_xmr_redeem_requests: bmrng::RequestReceiverStream< + // When a new `SwapEventLoopHandle` is created: + // 1. a channel is created for the EventLoop to send transfer_proofs to SwapEventLoopHandle + // 2. the corresponding PeerId of Alice is stored + // + // The sender of the channel is sent into this queue. The receiver is stored in the `SwapEventLoopHandle`. + // + // This is polled and then moved into `registered_swap_handlers` + queued_swap_handlers: bmrng::unbounded::UnboundedRequestReceiverStream< + ( + Uuid, + PeerId, + bmrng::unbounded::UnboundedRequestSender, + ), (), + >, + registered_swap_handlers: HashMap< + Uuid, + ( + PeerId, + bmrng::unbounded::UnboundedRequestSender, + ), + >, + + // These streams represents outgoing requests that we have to make (queues) + // + // Requests are keyed by the PeerId because they do not correspond to an existing swap yet + quote_requests: + bmrng::unbounded::UnboundedRequestReceiverStream>, + // TODO: technically NewSwap.swap_id already contains the id of the swap + execution_setup_requests: + bmrng::unbounded::UnboundedRequestReceiverStream<(PeerId, NewSwap), Result>, + + // These streams represents outgoing requests that we have to make (queues) + // + // Requests are keyed by the swap_id because they correspond to a specific swap + cooperative_xmr_redeem_requests: bmrng::unbounded::UnboundedRequestReceiverStream< + (PeerId, Uuid), Result, >, - encrypted_signatures_requests: - bmrng::RequestReceiverStream>, - execution_setup_requests: bmrng::RequestReceiverStream>, + encrypted_signatures_requests: bmrng::unbounded::UnboundedRequestReceiverStream< + (PeerId, Uuid, EncryptedSignature), + Result<(), OutboundFailure>, + >, // These represents requests that are currently in-flight. // Meaning that we have sent them to Alice, but we have not yet received a response. // Once we get a response to a matching [`RequestId`], we will use the responder to relay the // response. - inflight_quote_requests: - HashMap>>, - inflight_encrypted_signature_requests: - HashMap>>, - inflight_swap_setup: Option>>, + inflight_quote_requests: HashMap< + OutboundRequestId, + bmrng::unbounded::UnboundedResponder>, + >, + inflight_encrypted_signature_requests: HashMap< + OutboundRequestId, + bmrng::unbounded::UnboundedResponder>, + >, + inflight_swap_setup: + HashMap<(PeerId, Uuid), bmrng::unbounded::UnboundedResponder>>, inflight_cooperative_xmr_redeem_requests: HashMap< OutboundRequestId, - bmrng::Responder>, + bmrng::unbounded::UnboundedResponder< + Result, + >, >, - /// The sender we will use to relay incoming transfer proofs to the EventLoopHandle - /// The corresponding receiver is stored in the EventLoopHandle - transfer_proof_sender: bmrng::RequestSender, - - /// The future representing the successful handling of an incoming transfer - /// proof. + /// The future representing the successful handling of an incoming transfer proof (by the state machine) /// - /// Once we've sent a transfer proof to the ongoing swap, this future waits - /// until the swap took it "out" of the `EventLoopHandle`. As this future - /// resolves, we use the `ResponseChannel` returned from it to send an ACK - /// to Alice that we have successfully processed the transfer proof. - pending_transfer_proof: OptionFuture>>, + /// Once we've sent a transfer proof to the ongoing swap, a future is inserted into this set + /// which will resolve once the state machine has "processed" the transfer proof. + /// + /// The future will yield the swap_id and the response channel which are used to send an acknowledgement to Alice. + pending_transfer_proof_acks: FuturesUnordered)>>, } impl EventLoop { + fn swap_peer_id(&self, swap_id: &Uuid) -> Option { + self.registered_swap_handlers + .get(swap_id) + .map(|(peer_id, _)| *peer_id) + } + pub fn new( - swap_id: Uuid, swarm: Swarm, - alice_peer_id: PeerId, db: Arc, ) -> Result<(Self, EventLoopHandle)> { - // We still use a timeout here, because this protocol does not dial Alice itself - // and we want to fail if we cannot reach Alice + // We still use a timeout here because we trust our own implementation of the swap setup protocol less than the libp2p library let (execution_setup_sender, execution_setup_receiver) = - bmrng::channel_with_timeout(1, EXECUTION_SETUP_PROTOCOL_TIMEOUT); + bmrng::unbounded::channel_with_timeout(EXECUTION_SETUP_PROTOCOL_TIMEOUT); // It is okay to not have a timeout here, as timeouts are enforced by the request-response protocol - let (transfer_proof_sender, transfer_proof_receiver) = bmrng::channel(1); - let (encrypted_signature_sender, encrypted_signature_receiver) = bmrng::channel(1); - let (quote_sender, quote_receiver) = bmrng::channel(1); - let (cooperative_xmr_redeem_sender, cooperative_xmr_redeem_receiver) = bmrng::channel(1); + let (encrypted_signature_sender, encrypted_signature_receiver) = + bmrng::unbounded::channel(); + let (quote_sender, quote_receiver) = bmrng::unbounded::channel(); + let (cooperative_xmr_redeem_sender, cooperative_xmr_redeem_receiver) = + bmrng::unbounded::channel(); + let (queued_transfer_proof_sender, queued_transfer_proof_receiver) = + bmrng::unbounded::channel(); let event_loop = EventLoop { - swap_id, swarm, - alice_peer_id, + db, + queued_swap_handlers: queued_transfer_proof_receiver.into(), + registered_swap_handlers: HashMap::default(), execution_setup_requests: execution_setup_receiver.into(), - transfer_proof_sender, encrypted_signatures_requests: encrypted_signature_receiver.into(), cooperative_xmr_redeem_requests: cooperative_xmr_redeem_receiver.into(), quote_requests: quote_receiver.into(), inflight_quote_requests: HashMap::default(), - inflight_swap_setup: None, + inflight_swap_setup: HashMap::default(), inflight_encrypted_signature_requests: HashMap::default(), inflight_cooperative_xmr_redeem_requests: HashMap::default(), - pending_transfer_proof: OptionFuture::from(None), - db, + pending_transfer_proof_acks: FuturesUnordered::new(), }; let handle = EventLoopHandle { execution_setup_sender, - transfer_proof_receiver, encrypted_signature_sender, cooperative_xmr_redeem_sender, quote_sender, + queued_transfer_proof_sender, }; Ok((event_loop, handle)) } pub async fn run(mut self) { - match self.swarm.dial(DialOpts::from(self.alice_peer_id)) { - Ok(()) => {} - Err(e) => { - tracing::error!("Failed to initiate dial to Alice: {:?}", e); - return; - } - } - loop { // Note: We are making very elaborate use of `select!` macro's feature here. Make sure to read the documentation thoroughly: https://docs.rs/tokio/1.4.0/tokio/macro.select.html tokio::select! { swarm_event = self.swarm.select_next_some() => { match swarm_event { SwarmEvent::Behaviour(OutEvent::QuoteReceived { id, response }) => { + tracing::trace!( + %id, + "Received quote" + ); + if let Some(responder) = self.inflight_quote_requests.remove(&id) { let _ = responder.respond(Ok(response)); } } - SwarmEvent::Behaviour(OutEvent::SwapSetupCompleted(response)) => { - if let Some(responder) = self.inflight_swap_setup.take() { - let _ = responder.respond(*response); + SwarmEvent::Behaviour(OutEvent::SwapSetupCompleted { peer, swap_id, result }) => { + tracing::trace!( + %peer, + "Processing swap setup completion" + ); + + if let Some(responder) = self.inflight_swap_setup.remove(&(peer, swap_id)) { + let _ = responder.respond(*result); } } SwarmEvent::Behaviour(OutEvent::TransferProofReceived { msg, channel, peer }) => { + tracing::trace!( + %peer, + %msg.swap_id, + "Received transfer proof" + ); + let swap_id = msg.swap_id; - if swap_id == self.swap_id { - if peer != self.alice_peer_id { + // Check if we have a registered handler for this swap + if let Some((expected_peer_id, sender)) = self.registered_swap_handlers.get(&swap_id) { + // Ensure the transfer proof is coming from the expected peer + if peer != *expected_peer_id { tracing::warn!( - %swap_id, - "Ignoring malicious transfer proof from {}, expected to receive it from {}", - peer, - self.alice_peer_id); - continue; + %swap_id, + "Ignoring malicious transfer proof from {}, expected to receive it from {}", + peer, + expected_peer_id); + continue; } - // Immediately acknowledge if we've already processed this transfer proof - // This handles the case where Alice didn't receive our previous acknowledgment - // and is retrying sending the transfer proof - if let Ok(state) = self.db.get_state(swap_id).await { - let state: BobState = state.try_into() - .expect("Bobs database only contains Bob states"); - - if has_already_processed_transfer_proof(&state) { - tracing::warn!("Received transfer proof for swap {} but we are already in state {}. Acknowledging immediately. Alice most likely did not receive the acknowledgment when we sent it before", swap_id, state); - - // We set this to a future that will resolve immediately, and returns the channel - // This will be resolved in the next iteration of the event loop, and a response will be sent to Alice - self.pending_transfer_proof = OptionFuture::from(Some(async move { - channel - }.boxed())); - - continue; + // Send the transfer proof to the registered handler + match sender.send(msg.tx_lock_proof) { + Ok(mut responder) => { + // Insert a future that will resolve when the handle "takes the transfer proof out" + self.pending_transfer_proof_acks.push(async move { + let _ = responder.recv().await; + (swap_id, channel) + }.boxed()); } - } - - let mut responder = match self.transfer_proof_sender.send(msg.tx_lock_proof).await { - Ok(responder) => responder, Err(e) => { - tracing::warn!("Failed to pass on transfer proof: {:#}", e); - continue; - } - }; - - self.pending_transfer_proof = OptionFuture::from(Some(async move { - let _ = responder.recv().await; - - channel - }.boxed())); - }else { - // Check if the transfer proof is sent from the correct peer and if we have a record of the swap - match self.db.get_peer_id(swap_id).await { - // We have a record of the swap - Ok(buffer_swap_alice_peer_id) => { - if buffer_swap_alice_peer_id == self.alice_peer_id { - // Save transfer proof in the database such that we can process it later when we resume the swap - match self.db.insert_buffered_transfer_proof(swap_id, msg.tx_lock_proof).await { - Ok(_) => { - tracing::info!("Received transfer proof for swap {} while running swap {}. Buffering this transfer proof in the database for later retrieval", swap_id, self.swap_id); - let _ = self.swarm.behaviour_mut().transfer_proof.send_response(channel, ()); - } - Err(e) => { - tracing::error!("Failed to buffer transfer proof for swap {}: {:#}", swap_id, e); - } - }; - }else { - tracing::warn!( - %swap_id, - "Ignoring malicious transfer proof from {}, expected to receive it from {}", - self.swap_id, - buffer_swap_alice_peer_id); - } - }, - // We do not have a record of the swap or an error occurred while retrieving the peer id of Alice - Err(e) => { - if let Some(sqlx::Error::RowNotFound) = e.downcast_ref::() { - tracing::warn!("Ignoring transfer proof for swap {} while running swap {}. We do not have a record of this swap", swap_id, self.swap_id); - } else { - tracing::error!("Ignoring transfer proof for swap {} while running swap {}. Failed to retrieve the peer id of Alice for the corresponding swap: {:#}", swap_id, self.swap_id, e); - } + tracing::warn!( + %swap_id, + %peer, + error = ?e, + "Failed to pass transfer proof to registered handler" + ); } } + + continue; + } + + // Immediately acknowledge if we've already processed this transfer proof + // This handles the case where Alice didn't receive our previous acknowledgment + // and is retrying sending the transfer proof + match should_acknowledge_transfer_proof(self.db.clone(), swap_id, peer).await { + Ok(true) => { + // We set this to a future that will resolve immediately, and returns the channel + // This will be resolved in the next iteration of the event loop, and a response will be sent to Alice + self.pending_transfer_proof_acks.push(async move { + (swap_id, channel) + }.boxed()); + + // Skip evaluation of whether we should buffer the transfer proof + // if we already acknowledged the transfer proof + continue; + } + // TODO: Maybe we should log here? + Ok(false) => {} + Err(error) => { + tracing::warn!( + %swap_id, + %peer, + error = ?error, + "Failed to evaluate if we should acknowledge the transfer proof, we will not respond at all" + ); + } + } + + // Check if we should buffer the transfer proof + if let Err(error) = buffer_transfer_proof_if_needed(self.db.clone(), swap_id, peer, msg.tx_lock_proof).await { + tracing::warn!( + %swap_id, + %peer, + error = ?error, + "Failed to buffer transfer proof" + ); } } SwarmEvent::Behaviour(OutEvent::EncryptedSignatureAcknowledged { id }) => { @@ -239,30 +279,21 @@ impl EventLoop { tracing::warn!(%peer, err = ?error, "Communication error"); return; } - SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } if peer_id == self.alice_peer_id => { - tracing::info!(peer_id = %endpoint.get_remote_address(), "Connected to Alice"); + SwarmEvent::ConnectionEstablished { peer_id: _, endpoint, .. } => { + tracing::info!(peer_id = %endpoint.get_remote_address(), "Connected to peer"); } - SwarmEvent::Dialing { peer_id: Some(alice_peer_id), connection_id } if alice_peer_id == self.alice_peer_id => { - tracing::debug!(%alice_peer_id, %connection_id, "Dialing Alice"); + SwarmEvent::Dialing { peer_id: Some(peer_id), connection_id } => { + tracing::debug!(%peer_id, %connection_id, "Dialing peer"); } - SwarmEvent::ConnectionClosed { peer_id, endpoint, num_established, cause: Some(error), connection_id } if peer_id == self.alice_peer_id && num_established == 0 => { - tracing::warn!(peer_id = %endpoint.get_remote_address(), cause = ?error, %connection_id, "Lost connection to Alice"); - - if let Some(duration) = self.swarm.behaviour_mut().redial.until_next_redial() { - tracing::info!(seconds_until_next_redial = %duration.as_secs(), "Waiting for next redial attempt"); - } + SwarmEvent::ConnectionClosed { peer_id: _, endpoint, num_established, cause: Some(error), connection_id } if num_established == 0 => { + tracing::warn!(peer_id = %endpoint.get_remote_address(), cause = ?error, %connection_id, "Lost connection to peer"); } - SwarmEvent::ConnectionClosed { peer_id, num_established, cause: None, .. } if peer_id == self.alice_peer_id && num_established == 0 => { + SwarmEvent::ConnectionClosed { peer_id, num_established, cause: None, .. } if num_established == 0 => { // no error means the disconnection was requested - tracing::info!("Successfully closed connection to Alice"); - return; + tracing::info!(%peer_id, "Successfully closed connection to peer"); } - SwarmEvent::OutgoingConnectionError { peer_id: Some(alice_peer_id), error, connection_id } if alice_peer_id == self.alice_peer_id => { - tracing::warn!(%alice_peer_id, %connection_id, ?error, "Failed to connect to Alice"); - - if let Some(duration) = self.swarm.behaviour_mut().redial.until_next_redial() { - tracing::info!(seconds_until_next_redial = %duration.as_secs(), "Waiting for next redial attempt"); - } + SwarmEvent::OutgoingConnectionError { peer_id: Some(peer_id), error, connection_id } => { + tracing::warn!(%peer_id, %connection_id, ?error, "Outgoing connection error to peer"); } SwarmEvent::Behaviour(OutEvent::OutboundRequestResponseFailure {peer, error, request_id, protocol}) => { tracing::error!( @@ -299,98 +330,150 @@ impl EventLoop { %request_id, ?error, %protocol, - "Failed to receive request-response request from peer"); + "Failed to receive or send response for request-response request from peer"); + } + SwarmEvent::Behaviour(OutEvent::Redial(redial::Event::ScheduledRedial { peer, next_dial_in })) => { + tracing::trace!( + %peer, + seconds_until_next_redial = %next_dial_in.as_secs(), + "Scheduled redial for peer" + ); } _ => {} } }, // Handle to-be-sent outgoing requests for all our network protocols. - Some(((), responder)) = self.quote_requests.next().fuse() => { - let id = self.swarm.behaviour_mut().quote.send_request(&self.alice_peer_id, ()); - self.inflight_quote_requests.insert(id, responder); + Some((peer_id, responder)) = self.quote_requests.next().fuse() => { + let outbound_request_id = self.swarm.behaviour_mut().quote.send_request(&peer_id, ()); + self.inflight_quote_requests.insert(outbound_request_id, responder); + + tracing::trace!( + %peer_id, + %outbound_request_id, + "Dispatching outgoing quote request" + ); }, - Some((tx_redeem_encsig, responder)) = self.encrypted_signatures_requests.next().fuse() => { + Some(((peer_id, swap_id, tx_redeem_encsig), responder)) = self.encrypted_signatures_requests.next().fuse() => { let request = encrypted_signature::Request { - swap_id: self.swap_id, + swap_id, tx_redeem_encsig }; - let id = self.swarm.behaviour_mut().encrypted_signature.send_request(&self.alice_peer_id, request); - self.inflight_encrypted_signature_requests.insert(id, responder); + let outbound_request_id = self.swarm.behaviour_mut().encrypted_signature.send_request(&peer_id, request); + self.inflight_encrypted_signature_requests.insert(outbound_request_id, responder); + + tracing::trace!( + %peer_id, + %swap_id, + %outbound_request_id, + "Dispatching outgoing encrypted signature" + ); }, - Some((_, responder)) = self.cooperative_xmr_redeem_requests.next().fuse() => { - let id = self.swarm.behaviour_mut().cooperative_xmr_redeem.send_request(&self.alice_peer_id, Request { - swap_id: self.swap_id + Some(((peer_id, swap_id), responder)) = self.cooperative_xmr_redeem_requests.next().fuse() => { + let outbound_request_id = self.swarm.behaviour_mut().cooperative_xmr_redeem.send_request(&peer_id, Request { + swap_id }); - self.inflight_cooperative_xmr_redeem_requests.insert(id, responder); + self.inflight_cooperative_xmr_redeem_requests.insert(outbound_request_id, responder); + + tracing::trace!( + %peer_id, + %swap_id, + %outbound_request_id, + "Dispatching outgoing cooperative xmr redeem request" + ); }, - // We use `self.is_connected_to_alice` as a guard to "buffer" requests until we are connected. - // because the protocol does not dial Alice itself - // (unlike request-response above) - Some((swap, responder)) = self.execution_setup_requests.next().fuse(), if self.is_connected_to_alice() => { - self.swarm.behaviour_mut().swap_setup.start(self.alice_peer_id, swap).await; - self.inflight_swap_setup = Some(responder); - }, + // Instruct the swap setup behaviour to do a swap setup request + // The behaviour will instruct the swarm to dial Alice, so we don't need to check if we are connected + Some(((alice_peer_id, swap), responder)) = self.execution_setup_requests.next().fuse() => { + let swap_id = swap.swap_id.clone(); + self.swarm.behaviour_mut().swap_setup.start(alice_peer_id, swap).await; + self.inflight_swap_setup.insert((alice_peer_id, swap_id), responder); + + tracing::trace!( + %alice_peer_id, + "Dispatching outgoing execution setup request" + ); + }, // Send an acknowledgement to Alice once the EventLoopHandle has processed a received transfer proof - // We use `self.is_connected_to_alice` as a guard to "buffer" requests until we are connected. - // - // Why do we do this here but not for the other request-response channels? - // This is the only request, we don't have a retry mechanism for. We lazily send this. - Some(response_channel) = &mut self.pending_transfer_proof, if self.is_connected_to_alice() => { + Some((swap_id, response_channel)) = self.pending_transfer_proof_acks.next() => { + tracing::trace!( + %swap_id, + "Dispatching outgoing transfer proof acknowledgment"); + + // We do not check if we are connected to Alice here because responding on a channel + // which has been dropped works even if a new connections has been established since + // will not work because because a channel is always bounded to one connection if self.swarm.behaviour_mut().transfer_proof.send_response(response_channel, ()).is_err() { tracing::warn!("Failed to send acknowledgment to Alice that we have received the transfer proof"); } else { tracing::info!("Sent acknowledgment to Alice that we have received the transfer proof"); - self.pending_transfer_proof = OptionFuture::from(None); } }, + + Some(((swap_id, peer_id, sender), responder)) = self.queued_swap_handlers.next().fuse() => { + tracing::trace!(%swap_id, %peer_id, "Registering swap handle for a swap internally inside the event loop"); + + // This registers the swap_id -> peer_id and swap_id -> transfer_proof_sender + self.registered_swap_handlers.insert(swap_id, (peer_id, sender)); + + // Instruct the swarm to contineously redial the peer + // TODO: We must remove it again once the swap is complete, otherwise we will redial indefinitely + self.swarm.behaviour_mut().redial.add_peer(peer_id); + + // Acknowledge the registration + let _ = responder.respond(()); + }, } } } - - fn is_connected_to_alice(&self) -> bool { - self.swarm.is_connected(&self.alice_peer_id) - } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct EventLoopHandle { - /// When a NewSwap object is sent into this channel, the EventLoop will: - /// 1. Trigger the swap setup protocol with Alice to negotiate the swap parameters + /// When a (PeerId, NewSwap) tuple is sent into this channel, the EventLoop will: + /// 1. Trigger the swap setup protocol with the specified peer to negotiate the swap parameters /// 2. Return the resulting State2 if successful /// 3. Return an anyhow error if the request fails - execution_setup_sender: bmrng::RequestSender>, + execution_setup_sender: + bmrng::unbounded::UnboundedRequestSender<(PeerId, NewSwap), Result>, - /// Receiver for incoming Monero transfer proofs from Alice. - /// When a proof is received, we process it and acknowledge receipt back to the EventLoop - /// The EventLoop will then send an acknowledgment back to Alice over the network - transfer_proof_receiver: bmrng::RequestReceiver, - - /// When an encrypted signature is sent into this channel, the EventLoop will: - /// 1. Send the encrypted signature to Alice over the network - /// 2. Return Ok(()) if Alice acknowledges receipt, or + /// When a (PeerId, Uuid, EncryptedSignature) tuple is sent into this channel, the EventLoop will: + /// 1. Send the encrypted signature to the specified peer over the network + /// 2. Return Ok(()) if the peer acknowledges receipt, or /// 3. Return an OutboundFailure error if the request fails - encrypted_signature_sender: - bmrng::RequestSender>, + encrypted_signature_sender: bmrng::unbounded::UnboundedRequestSender< + (PeerId, Uuid, EncryptedSignature), + Result<(), OutboundFailure>, + >, - /// When a () is sent into this channel, the EventLoop will: - /// 1. Request a price quote from Alice + /// When a PeerId is sent into this channel, the EventLoop will: + /// 1. Request a price quote from the specified peer /// 2. Return the quote if successful /// 3. Return an OutboundFailure error if the request fails - quote_sender: bmrng::RequestSender<(), Result>, + quote_sender: + bmrng::unbounded::UnboundedRequestSender>, - /// When a () is sent into this channel, the EventLoop will: - /// 1. Request Alice's cooperation in redeeming the Monero - /// 2. Return the a response object (Fullfilled or Rejected), if the network request is successful + /// When a (PeerId, Uuid) tuple is sent into this channel, the EventLoop will: + /// 1. Request the specified peer's cooperation in redeeming the Monero for the given swap + /// 2. Return a response object (Fullfilled or Rejected), if the network request is successful /// The Fullfilled object contains the keys required to redeem the Monero /// 3. Return an OutboundFailure error if the network request fails - cooperative_xmr_redeem_sender: bmrng::RequestSender< - (), + cooperative_xmr_redeem_sender: bmrng::unbounded::UnboundedRequestSender< + (PeerId, Uuid), Result, >, + + queued_transfer_proof_sender: bmrng::unbounded::UnboundedRequestSender< + ( + Uuid, + PeerId, + bmrng::unbounded::UnboundedRequestSender, + ), + (), + >, } impl EventLoopHandle { @@ -401,14 +484,44 @@ impl EventLoopHandle { .build() } - pub async fn setup_swap(&mut self, swap: NewSwap) -> Result { - tracing::debug!(swap = ?swap, "Sending swap setup request"); + /// Creates a SwapEventLoopHandle for a specific swap + /// This registers the swap's transfer proof receiver with the event loop + pub async fn swap_handle( + &mut self, + peer_id: PeerId, + swap_id: Uuid, + ) -> Result { + // Create a channel for sending transfer proofs from the `EventLoop` to the `SwapEventLoopHandle` + // + // The sender is stored in the `EventLoop`. The receiver is stored in the `SwapEventLoopHandle`. + let (transfer_proof_sender, transfer_proof_receiver) = bmrng::unbounded_channel(); + + // Register this sender in the `EventLoop` + // It is put into the queue and then later moved into `registered_transfer_proof_senders` + // + // We use `send(...) instead of send_receive(...)` because the event loop needs to be running for this to respond + self.queued_transfer_proof_sender + .send((swap_id, peer_id, transfer_proof_sender)) + .context("Failed to register transfer proof sender with event loop")?; + + Ok(SwapEventLoopHandle { + handle: self.clone(), + peer_id, + swap_id, + transfer_proof_receiver: Some(transfer_proof_receiver), + }) + } + + pub async fn setup_swap(&mut self, peer_id: PeerId, swap: NewSwap) -> Result { + tracing::debug!(swap = ?swap, %peer_id, "Sending swap setup request"); let backoff = Self::create_retry_config(EXECUTION_SETUP_PROTOCOL_TIMEOUT); backoff::future::retry_notify(backoff, || async { - match self.execution_setup_sender.send_receive(swap.clone()).await { - Ok(Ok(state2)) => Ok(state2), + match self.execution_setup_sender.send_receive((peer_id, swap.clone())).await { + Ok(Ok(state2)) => { + Ok(state2) + } // These are errors thrown by the swap_setup/bob behaviour Ok(Err(err)) => { Err(backoff::Error::transient(err.context("A network error occurred while setting up the swap"))) @@ -428,33 +541,19 @@ impl EventLoopHandle { error = ?err, "Failed to setup swap. We will retry in {} seconds", wait_time.as_secs() - ) + ); }) .await .context("Failed to setup swap after retries") } - pub async fn recv_transfer_proof(&mut self) -> Result { - let (transfer_proof, responder) = self - .transfer_proof_receiver - .recv() - .await - .context("Failed to receive transfer proof")?; - - responder - .respond(()) - .context("Failed to acknowledge receipt of transfer proof")?; - - Ok(transfer_proof) - } - - pub async fn request_quote(&mut self) -> Result { - tracing::debug!("Requesting quote"); + pub async fn request_quote(&mut self, peer_id: PeerId) -> Result { + tracing::debug!(%peer_id, "Requesting quote"); let backoff = Self::create_retry_config(REQUEST_RESPONSE_PROTOCOL_TIMEOUT); backoff::future::retry_notify(backoff, || async { - match self.quote_sender.send_receive(()).await { + match self.quote_sender.send_receive(peer_id).await { Ok(Ok(quote)) => Ok(quote), Ok(Err(err)) => { Err(backoff::Error::transient(anyhow!(err).context("A network error occurred while requesting a quote"))) @@ -474,13 +573,17 @@ impl EventLoopHandle { .context("Failed to request quote after retries") } - pub async fn request_cooperative_xmr_redeem(&mut self) -> Result { - tracing::debug!("Requesting cooperative XMR redeem"); + pub async fn request_cooperative_xmr_redeem( + &mut self, + peer_id: PeerId, + swap_id: Uuid, + ) -> Result { + tracing::debug!(%peer_id, %swap_id, "Requesting cooperative XMR redeem"); let backoff = Self::create_retry_config(REQUEST_RESPONSE_PROTOCOL_TIMEOUT); backoff::future::retry_notify(backoff, || async { - match self.cooperative_xmr_redeem_sender.send_receive(()).await { + match self.cooperative_xmr_redeem_sender.send_receive((peer_id, swap_id)).await { Ok(Ok(response)) => Ok(response), Ok(Err(err)) => { Err(backoff::Error::transient(anyhow!(err).context("A network error occurred while requesting cooperative XMR redeem"))) @@ -500,8 +603,13 @@ impl EventLoopHandle { .context("Failed to request cooperative XMR redeem after retries") } - pub async fn send_encrypted_signature(&mut self, tx_redeem_encsig: EncryptedSignature) { - tracing::debug!("Sending encrypted signature"); + pub async fn send_encrypted_signature( + &mut self, + peer_id: PeerId, + swap_id: Uuid, + tx_redeem_encsig: EncryptedSignature, + ) -> () { + tracing::debug!(%peer_id, %swap_id, "Sending encrypted signature"); // We will retry indefinitely until we succeed let backoff = backoff::ExponentialBackoffBuilder::new() @@ -510,7 +618,7 @@ impl EventLoopHandle { .build(); backoff::future::retry_notify(backoff, || async { - match self.encrypted_signature_sender.send_receive(tx_redeem_encsig.clone()).await { + match self.encrypted_signature_sender.send_receive((peer_id, swap_id, tx_redeem_encsig.clone())).await { Ok(Ok(_)) => Ok(()), Ok(Err(err)) => { Err(backoff::Error::transient(anyhow!(err).context("A network error occurred while sending the encrypted signature"))) @@ -530,3 +638,101 @@ impl EventLoopHandle { .expect("we should never run out of retries when sending an encrypted signature") } } + +#[derive(Debug)] +pub struct SwapEventLoopHandle { + handle: EventLoopHandle, + peer_id: PeerId, + swap_id: Uuid, + transfer_proof_receiver: + Option>, +} + +impl SwapEventLoopHandle { + pub async fn recv_transfer_proof(&mut self) -> Result { + let receiver = self + .transfer_proof_receiver + .as_mut() + .context("Transfer proof receiver not available")?; + + let (transfer_proof, responder) = receiver + .recv() + .await + .context("Failed to receive transfer proof")?; + + responder + .respond(()) + .context("Failed to acknowledge receipt of transfer proof")?; + + Ok(transfer_proof) + } + + pub async fn send_encrypted_signature(&mut self, tx_redeem_encsig: EncryptedSignature) -> () { + self.handle + .send_encrypted_signature(self.peer_id, self.swap_id, tx_redeem_encsig) + .await + } + + pub async fn request_cooperative_xmr_redeem(&mut self) -> Result { + self.handle + .request_cooperative_xmr_redeem(self.peer_id, self.swap_id) + .await + } + + pub async fn setup_swap(&mut self, swap: NewSwap) -> Result { + self.handle.setup_swap(self.peer_id, swap).await + } + + pub async fn request_quote(&mut self) -> Result { + self.handle.request_quote(self.peer_id).await + } +} + +/// Returns Ok(true) if we should acknowledge the transfer proof +/// +/// - Checks if the peer id is the expected peer id +/// - Checks if the state indicates that we have already processed the transfer proof +async fn should_acknowledge_transfer_proof( + db: Arc, + swap_id: Uuid, + peer_id: PeerId, +) -> Result { + let expected_peer_id = db.get_peer_id(swap_id).await.context( + "Failed to get peer id for swap to check if we should acknowledge the transfer proof", + )?; + + // If the peer id is not the expected peer id, we should not acknowledge the transfer proof + // This is to prevent malicious requests + if expected_peer_id != peer_id { + bail!("Expected peer id {} but got {}", expected_peer_id, peer_id); + } + + let state = db.get_state(swap_id).await.context( + "Failed to get state for swap to check if we should acknowledge the transfer proof", + )?; + let state: BobState = state.try_into().context( + "Failed to convert state to BobState to check if we should acknowledge the transfer proof", + )?; + + Ok(has_already_processed_transfer_proof(&state)) +} + +/// Buffers the transfer proof in the database if its from the expected peer +async fn buffer_transfer_proof_if_needed( + db: Arc, + swap_id: Uuid, + peer_id: PeerId, + transfer_proof: monero::TransferProof, +) -> Result<()> { + let expected_peer_id = db.get_peer_id(swap_id).await.context( + "Failed to get peer id for swap to check if we should buffer the transfer proof", + )?; + + if expected_peer_id != peer_id { + bail!("Expected peer id {} but got {}", expected_peer_id, peer_id); + } + + db.insert_buffered_transfer_proof(swap_id, transfer_proof) + .await + .context("Failed to buffer transfer proof in database") +} diff --git a/swap/src/common/tracing_util.rs b/swap/src/common/tracing_util.rs index 3b127a27..7db59d41 100644 --- a/swap/src/common/tracing_util.rs +++ b/swap/src/common/tracing_util.rs @@ -220,13 +220,16 @@ mod crates { ]; pub const OUR_CRATES: &[&str] = &[ - "swap", + // Library crates "swap_p2p", - "asb", "swap_env", + "swap_core", "swap_fs", "swap_serde", "monero_sys", + // Binary crates + "swap", + "asb", "unstoppableswap_gui_rs", ]; diff --git a/swap/src/network.rs b/swap/src/network.rs index a50a3367..e2c7adda 100644 --- a/swap/src/network.rs +++ b/swap/src/network.rs @@ -1,11 +1,11 @@ pub use swap_p2p::protocols::cooperative_xmr_redeem_after_punish; pub use swap_p2p::protocols::encrypted_signature; pub use swap_p2p::protocols::quote; +pub use swap_p2p::protocols::redial; pub use swap_p2p::protocols::rendezvous; pub use swap_p2p::protocols::swap_setup; pub use swap_p2p::protocols::transfer_proof; -pub mod redial; pub mod swarm; pub mod transport; diff --git a/swap/src/network/redial.rs b/swap/src/network/redial.rs deleted file mode 100644 index 0b8ba915..00000000 --- a/swap/src/network/redial.rs +++ /dev/null @@ -1,135 +0,0 @@ -use backoff::backoff::Backoff; -use backoff::ExponentialBackoff; -use futures::future::FutureExt; -use libp2p::core::Multiaddr; -use libp2p::swarm::dial_opts::{DialOpts, PeerCondition}; -use libp2p::swarm::{NetworkBehaviour, ToSwarm}; -use libp2p::PeerId; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::time::Duration; -use tokio::time::{Instant, Sleep}; -use void::Void; - -/// A [`NetworkBehaviour`] that tracks whether we are connected to the given -/// peer and attempts to re-establish a connection with an exponential backoff -/// if we lose the connection. -pub struct Behaviour { - /// The peer we are interested in. - peer: PeerId, - /// If present, tracks for how long we need to sleep until we dial again. - sleep: Option>>, - /// Tracks the current backoff state. - backoff: ExponentialBackoff, -} - -impl Behaviour { - pub fn new(peer: PeerId, interval: Duration, max_interval: Duration) -> Self { - Self { - peer, - sleep: None, - backoff: ExponentialBackoff { - initial_interval: interval, - current_interval: interval, - max_interval, - max_elapsed_time: None, // We never give up on re-dialling - ..ExponentialBackoff::default() - }, - } - } - - pub fn until_next_redial(&self) -> Option { - let until_next_redial = self - .sleep - .as_ref()? - .deadline() - .checked_duration_since(Instant::now())?; - - Some(until_next_redial) - } -} - -impl NetworkBehaviour for Behaviour { - type ConnectionHandler = libp2p::swarm::dummy::ConnectionHandler; - type ToSwarm = (); - - fn handle_established_inbound_connection( - &mut self, - _connection_id: libp2p::swarm::ConnectionId, - peer: PeerId, - _local_addr: &Multiaddr, - _remote_addr: &Multiaddr, - ) -> Result, libp2p::swarm::ConnectionDenied> { - // We establish an inbound connection to the peer we are interested in. - // We stop re-dialling. - // Reset the backoff state to start with the initial interval again once we disconnect again - if peer == self.peer { - self.backoff.reset(); - self.sleep = None; - } - Ok(Self::ConnectionHandler {}) - } - - fn handle_established_outbound_connection( - &mut self, - _connection_id: libp2p::swarm::ConnectionId, - peer: PeerId, - _addr: &Multiaddr, - _role_override: libp2p::core::Endpoint, - ) -> Result, libp2p::swarm::ConnectionDenied> { - // We establish an outbound connection to the peer we are interested in. - // We stop re-dialling. - // Reset the backoff state to start with the initial interval again once we disconnect again - if peer == self.peer { - self.backoff.reset(); - self.sleep = None; - } - Ok(Self::ConnectionHandler {}) - } - - fn on_swarm_event(&mut self, event: libp2p::swarm::FromSwarm<'_>) { - let redial = match event { - libp2p::swarm::FromSwarm::ConnectionClosed(e) if e.peer_id == self.peer => true, - libp2p::swarm::FromSwarm::DialFailure(e) if e.peer_id == Some(self.peer) => true, - _ => false, - }; - - if redial && self.sleep.is_none() { - self.sleep = Some(Box::pin(tokio::time::sleep(self.backoff.initial_interval))); - tracing::info!(seconds_until_next_redial = %self.until_next_redial().expect("We initialize the backoff without max_elapsed_time").as_secs(), "Waiting for next redial attempt"); - } - } - - fn poll(&mut self, cx: &mut Context<'_>) -> std::task::Poll> { - let sleep = match self.sleep.as_mut() { - None => return Poll::Pending, // early exit if we shouldn't be re-dialling - Some(future) => future, - }; - - futures::ready!(sleep.poll_unpin(cx)); - - let next_dial_in = match self.backoff.next_backoff() { - Some(next_dial_in) => next_dial_in, - None => { - unreachable!("The backoff should never run out of attempts"); - } - }; - - self.sleep = Some(Box::pin(tokio::time::sleep(next_dial_in))); - - Poll::Ready(ToSwarm::Dial { - opts: DialOpts::peer_id(self.peer) - .condition(PeerCondition::Disconnected) - .build(), - }) - } - - fn on_connection_handler_event( - &mut self, - _peer_id: PeerId, - _connection_id: libp2p::swarm::ConnectionId, - _event: libp2p::swarm::THandlerOutEvent, - ) { - unreachable!("The re-dial dummy connection handler does not produce any events"); - } -} diff --git a/swap/src/protocol/bob.rs b/swap/src/protocol/bob.rs index d19abaca..5a6ba065 100644 --- a/swap/src/protocol/bob.rs +++ b/swap/src/protocol/bob.rs @@ -20,7 +20,7 @@ pub mod swap; pub struct Swap { pub state: BobState, - pub event_loop_handle: cli::EventLoopHandle, + pub event_loop_handle: cli::SwapEventLoopHandle, pub db: Arc, pub bitcoin_wallet: Arc, pub monero_wallet: Arc, @@ -38,7 +38,7 @@ impl Swap { bitcoin_wallet: Arc, monero_wallet: Arc, env_config: env::Config, - event_loop_handle: cli::EventLoopHandle, + event_loop_handle: cli::SwapEventLoopHandle, monero_receive_pool: MoneroAddressPool, bitcoin_change_address: bitcoin::Address, btc_amount: bitcoin::Amount, @@ -68,7 +68,7 @@ impl Swap { bitcoin_wallet: Arc, monero_wallet: Arc, env_config: env::Config, - event_loop_handle: cli::EventLoopHandle, + event_loop_handle: cli::SwapEventLoopHandle, monero_receive_pool: MoneroAddressPool, ) -> Result { let state = db.get_state(id).await?.try_into()?; diff --git a/swap/src/protocol/bob/swap.rs b/swap/src/protocol/bob/swap.rs index 5c0b9080..46c26314 100644 --- a/swap/src/protocol/bob/swap.rs +++ b/swap/src/protocol/bob/swap.rs @@ -1,6 +1,6 @@ use crate::cli::api::tauri_bindings::LockBitcoinDetails; use crate::cli::api::tauri_bindings::{TauriEmitter, TauriHandle, TauriSwapProgressEvent}; -use crate::cli::EventLoopHandle; +use crate::cli::SwapEventLoopHandle; use crate::common::retry; use crate::monero; use crate::monero::MoneroAddressPool; @@ -8,7 +8,7 @@ use crate::network::cooperative_xmr_redeem_after_punish::Response::{Fullfilled, use crate::network::swap_setup::bob::NewSwap; use crate::protocol::bob::*; use crate::protocol::{bob, Database}; -use anyhow::{bail, Context as AnyContext, Result}; +use anyhow::{Context as AnyContext, Result}; use std::sync::Arc; use std::time::Duration; use swap_core::bitcoin::{ExpiredTimelocks, TxCancel, TxRefund}; @@ -90,7 +90,7 @@ pub async fn run_until( async fn next_state( swap_id: Uuid, state: BobState, - event_loop_handle: &mut EventLoopHandle, + event_loop_handle: &mut SwapEventLoopHandle, db: Arc, bitcoin_wallet: Arc, monero_wallet: Arc, diff --git a/swap/tests/harness/mod.rs b/swap/tests/harness/mod.rs index a45262b8..af27448b 100644 --- a/swap/tests/harness/mod.rs +++ b/swap/tests/harness/mod.rs @@ -57,7 +57,7 @@ where let cli = Cli::default(); tracing_subscriber::fmt() - .with_env_filter("info,swap=debug,monero_harness=debug,monero_rpc=debug,bitcoin_harness=info,testcontainers=info,monero_cpp=info,monero_sys=debug") // add `reqwest::connect::verbose=trace` if you want to logs of the RPC clients + .with_env_filter("info,swap=trace,swap_p2p=trace,monero_harness=debug,monero_rpc=debug,bitcoin_harness=info,testcontainers=info,monero_cpp=info,monero_sys=debug") // add `reqwest::connect::verbose=trace` if you want to logs of the RPC clients .with_test_writer() .init(); @@ -525,7 +525,9 @@ impl BobParams { } let db = Arc::new(SqliteDatabase::open(&self.db_path, AccessMode::ReadWrite).await?); - let (event_loop, handle) = self.new_eventloop(swap_id, db.clone()).await?; + let (event_loop, mut handle) = self.new_eventloop(db.clone()).await?; + + let swap_handle = handle.swap_handle(self.alice_peer_id, swap_id).await?; let swap = bob::Swap::from_db( db.clone(), @@ -533,7 +535,7 @@ impl BobParams { self.bitcoin_wallet.clone(), self.monero_wallet.clone(), self.env_config, - handle, + swap_handle, self.monero_wallet .main_wallet() .await @@ -560,17 +562,19 @@ impl BobParams { } let db = Arc::new(SqliteDatabase::open(&self.db_path, AccessMode::ReadWrite).await?); - let (event_loop, handle) = self.new_eventloop(swap_id, db.clone()).await?; + let (event_loop, mut handle) = self.new_eventloop(db.clone()).await?; db.insert_peer_id(swap_id, self.alice_peer_id).await?; + let swap_handle = handle.swap_handle(self.alice_peer_id, swap_id).await?; + let swap = bob::Swap::new( db, swap_id, self.bitcoin_wallet.clone(), self.monero_wallet.clone(), self.env_config, - handle, + swap_handle, self.monero_wallet .main_wallet() .await @@ -587,13 +591,11 @@ impl BobParams { pub async fn new_eventloop( &self, - swap_id: Uuid, db: Arc, ) -> Result<(cli::EventLoop, cli::EventLoopHandle)> { let identity = self.seed.derive_libp2p_identity(); let behaviour = cli::Behaviour::new( - self.alice_peer_id, self.env_config, self.bitcoin_wallet.clone(), (identity.clone(), XmrBtcNamespace::Testnet), @@ -601,7 +603,7 @@ impl BobParams { let mut swarm = swarm::cli(identity.clone(), None, behaviour).await?; swarm.add_peer_address(self.alice_peer_id, self.alice_address.clone()); - cli::EventLoop::new(swap_id, swarm, self.alice_peer_id, db.clone()) + cli::EventLoop::new(swarm, db.clone()) } }