diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index dcc6f638..136453a7 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -10,6 +10,7 @@ use anyhow::{bail, Context, Result}; use futures::future; use futures::future::{BoxFuture, FutureExt}; use futures::stream::{FuturesUnordered, StreamExt}; +use libp2p::swarm::SwarmEvent; use libp2p::{PeerId, Swarm}; use rand::rngs::OsRng; use std::collections::HashMap; @@ -80,12 +81,12 @@ where loop { tokio::select! { - swarm_event = self.swarm.next() => { + swarm_event = self.swarm.next_event() => { match swarm_event { - OutEvent::ConnectionEstablished(alice) => { + SwarmEvent::Behaviour(OutEvent::ConnectionEstablished(alice)) => { debug!("Connection Established with {}", alice); } - OutEvent::SpotPriceRequested { request, channel, peer } => { + SwarmEvent::Behaviour(OutEvent::SpotPriceRequested { request, channel, peer }) => { let btc = request.btc; let xmr = match self.handle_spot_price_request(btc, self.monero_wallet.clone()).await { Ok(xmr) => xmr, @@ -111,7 +112,7 @@ where } } } - OutEvent::QuoteRequested { channel, peer } => { + SwarmEvent::Behaviour(OutEvent::QuoteRequested { channel, peer }) => { let quote = match self.make_quote(self.max_buy).await { Ok(quote) => quote, Err(e) => { @@ -129,13 +130,13 @@ where } } } - OutEvent::ExecutionSetupDone{bob_peer_id, state3} => { + SwarmEvent::Behaviour(OutEvent::ExecutionSetupDone{bob_peer_id, state3}) => { let _ = self.handle_execution_setup_done(bob_peer_id, *state3).await; } - OutEvent::TransferProofAcknowledged(peer) => { + SwarmEvent::Behaviour(OutEvent::TransferProofAcknowledged(peer)) => { trace!(%peer, "Bob acknowledged transfer proof"); } - OutEvent::EncryptedSignatureReceived{ msg, channel, peer } => { + SwarmEvent::Behaviour(OutEvent::EncryptedSignatureReceived{ msg, channel, peer }) => { match self.recv_encrypted_signature.remove(&peer) { Some(sender) => { // this failing just means the receiver is no longer interested ... @@ -148,10 +149,27 @@ where self.swarm.send_encrypted_signature_ack(channel); } - OutEvent::ResponseSent => {} - OutEvent::Failure {peer, error} => { + SwarmEvent::Behaviour(OutEvent::ResponseSent) => {} + SwarmEvent::Behaviour(OutEvent::Failure {peer, error}) => { error!(%peer, "Communication error: {:#}", error); } + SwarmEvent::ConnectionEstablished { peer_id: peer, endpoint, .. } => { + tracing::debug!(%peer, address = %endpoint.get_remote_address(), "New connection established"); + } + SwarmEvent::IncomingConnectionError { send_back_addr: address, error, .. } => { + tracing::warn!(%address, "Failed to set up connection with peer: {}", error); + } + SwarmEvent::ConnectionClosed { peer_id: peer, num_established, endpoint, cause } if num_established == 0 => { + match cause { + Some(error) => { + tracing::warn!(%peer, address = %endpoint.get_remote_address(), "Lost connection: {}", error); + }, + None => { + tracing::info!(%peer, address = %endpoint.get_remote_address(), "Successfully closed connection"); + } + } + } + _ => {} } }, next_transfer_proof = self.send_transfer_proof.next() => { diff --git a/swap/src/protocol/bob/event_loop.rs b/swap/src/protocol/bob/event_loop.rs index 16472190..fc40805a 100644 --- a/swap/src/protocol/bob/event_loop.rs +++ b/swap/src/protocol/bob/event_loop.rs @@ -5,6 +5,7 @@ use crate::protocol::bob::{Behaviour, OutEvent, State0, State2}; use crate::{bitcoin, monero}; use anyhow::{anyhow, bail, Context, Result}; use futures::FutureExt; +use libp2p::swarm::SwarmEvent; use libp2p::{PeerId, Swarm}; use std::convert::Infallible; use std::sync::Arc; @@ -180,34 +181,37 @@ impl EventLoop { pub async fn run(mut self) -> Result { loop { tokio::select! { - swarm_event = self.swarm.next().fuse() => { + swarm_event = self.swarm.next_event().fuse() => { match swarm_event { - OutEvent::ConnectionEstablished(peer_id) => { + SwarmEvent::Behaviour(OutEvent::ConnectionEstablished(peer_id)) => { let _ = self.conn_established.send(peer_id).await; } - OutEvent::SpotPriceReceived(msg) => { + SwarmEvent::Behaviour(OutEvent::SpotPriceReceived(msg)) => { let _ = self.recv_spot_price.send(msg).await; }, - OutEvent::QuoteReceived(msg) => { + SwarmEvent::Behaviour(OutEvent::QuoteReceived(msg)) => { let _ = self.recv_quote.send(msg).await; }, - OutEvent::ExecutionSetupDone(res) => { + SwarmEvent::Behaviour(OutEvent::ExecutionSetupDone(res)) => { let _ = self.done_execution_setup.send(res.map(|state|*state)).await; } - OutEvent::TransferProofReceived{ msg, channel }=> { + SwarmEvent::Behaviour(OutEvent::TransferProofReceived{ msg, channel }) => { let _ = self.recv_transfer_proof.send(*msg).await; // Send back empty response so that the request/response protocol completes. if let Err(error) = self.swarm.transfer_proof.send_response(channel, ()) { error!("Failed to send Transfer Proof ack: {:?}", error); } } - OutEvent::EncryptedSignatureAcknowledged => { + SwarmEvent::Behaviour(OutEvent::EncryptedSignatureAcknowledged) => { debug!("Alice acknowledged encrypted signature"); } - OutEvent::ResponseSent => {} - OutEvent::CommunicationError(err) => { + SwarmEvent::Behaviour(OutEvent::ResponseSent) => { + + } + SwarmEvent::Behaviour(OutEvent::CommunicationError(err)) => { bail!(err.context("Communication error")) } + _ => {} } }, option = self.dial_alice.recv().fuse() => {