Listen on all swarm events instead of just behaviour events

This commit is contained in:
Thomas Eizinger 2021-03-18 15:54:33 +11:00
parent 2200fce3f3
commit 804b34f6b0
No known key found for this signature in database
GPG Key ID: 651AC83A6C6C8B96
2 changed files with 40 additions and 18 deletions

View File

@ -10,6 +10,7 @@ use anyhow::{bail, Context, Result};
use futures::future;
use futures::future::{BoxFuture, FutureExt};
use futures::stream::{FuturesUnordered, StreamExt};
use libp2p::swarm::SwarmEvent;
use libp2p::{PeerId, Swarm};
use rand::rngs::OsRng;
use std::collections::HashMap;
@ -80,12 +81,12 @@ where
loop {
tokio::select! {
swarm_event = self.swarm.next() => {
swarm_event = self.swarm.next_event() => {
match swarm_event {
OutEvent::ConnectionEstablished(alice) => {
SwarmEvent::Behaviour(OutEvent::ConnectionEstablished(alice)) => {
debug!("Connection Established with {}", alice);
}
OutEvent::SpotPriceRequested { request, channel, peer } => {
SwarmEvent::Behaviour(OutEvent::SpotPriceRequested { request, channel, peer }) => {
let btc = request.btc;
let xmr = match self.handle_spot_price_request(btc, self.monero_wallet.clone()).await {
Ok(xmr) => xmr,
@ -111,7 +112,7 @@ where
}
}
}
OutEvent::QuoteRequested { channel, peer } => {
SwarmEvent::Behaviour(OutEvent::QuoteRequested { channel, peer }) => {
let quote = match self.make_quote(self.max_buy).await {
Ok(quote) => quote,
Err(e) => {
@ -129,13 +130,13 @@ where
}
}
}
OutEvent::ExecutionSetupDone{bob_peer_id, state3} => {
SwarmEvent::Behaviour(OutEvent::ExecutionSetupDone{bob_peer_id, state3}) => {
let _ = self.handle_execution_setup_done(bob_peer_id, *state3).await;
}
OutEvent::TransferProofAcknowledged(peer) => {
SwarmEvent::Behaviour(OutEvent::TransferProofAcknowledged(peer)) => {
trace!(%peer, "Bob acknowledged transfer proof");
}
OutEvent::EncryptedSignatureReceived{ msg, channel, peer } => {
SwarmEvent::Behaviour(OutEvent::EncryptedSignatureReceived{ msg, channel, peer }) => {
match self.recv_encrypted_signature.remove(&peer) {
Some(sender) => {
// this failing just means the receiver is no longer interested ...
@ -148,10 +149,27 @@ where
self.swarm.send_encrypted_signature_ack(channel);
}
OutEvent::ResponseSent => {}
OutEvent::Failure {peer, error} => {
SwarmEvent::Behaviour(OutEvent::ResponseSent) => {}
SwarmEvent::Behaviour(OutEvent::Failure {peer, error}) => {
error!(%peer, "Communication error: {:#}", error);
}
SwarmEvent::ConnectionEstablished { peer_id: peer, endpoint, .. } => {
tracing::debug!(%peer, address = %endpoint.get_remote_address(), "New connection established");
}
SwarmEvent::IncomingConnectionError { send_back_addr: address, error, .. } => {
tracing::warn!(%address, "Failed to set up connection with peer: {}", error);
}
SwarmEvent::ConnectionClosed { peer_id: peer, num_established, endpoint, cause } if num_established == 0 => {
match cause {
Some(error) => {
tracing::warn!(%peer, address = %endpoint.get_remote_address(), "Lost connection: {}", error);
},
None => {
tracing::info!(%peer, address = %endpoint.get_remote_address(), "Successfully closed connection");
}
}
}
_ => {}
}
},
next_transfer_proof = self.send_transfer_proof.next() => {

View File

@ -5,6 +5,7 @@ use crate::protocol::bob::{Behaviour, OutEvent, State0, State2};
use crate::{bitcoin, monero};
use anyhow::{anyhow, bail, Context, Result};
use futures::FutureExt;
use libp2p::swarm::SwarmEvent;
use libp2p::{PeerId, Swarm};
use std::convert::Infallible;
use std::sync::Arc;
@ -180,34 +181,37 @@ impl EventLoop {
pub async fn run(mut self) -> Result<Infallible> {
loop {
tokio::select! {
swarm_event = self.swarm.next().fuse() => {
swarm_event = self.swarm.next_event().fuse() => {
match swarm_event {
OutEvent::ConnectionEstablished(peer_id) => {
SwarmEvent::Behaviour(OutEvent::ConnectionEstablished(peer_id)) => {
let _ = self.conn_established.send(peer_id).await;
}
OutEvent::SpotPriceReceived(msg) => {
SwarmEvent::Behaviour(OutEvent::SpotPriceReceived(msg)) => {
let _ = self.recv_spot_price.send(msg).await;
},
OutEvent::QuoteReceived(msg) => {
SwarmEvent::Behaviour(OutEvent::QuoteReceived(msg)) => {
let _ = self.recv_quote.send(msg).await;
},
OutEvent::ExecutionSetupDone(res) => {
SwarmEvent::Behaviour(OutEvent::ExecutionSetupDone(res)) => {
let _ = self.done_execution_setup.send(res.map(|state|*state)).await;
}
OutEvent::TransferProofReceived{ msg, channel }=> {
SwarmEvent::Behaviour(OutEvent::TransferProofReceived{ msg, channel }) => {
let _ = self.recv_transfer_proof.send(*msg).await;
// Send back empty response so that the request/response protocol completes.
if let Err(error) = self.swarm.transfer_proof.send_response(channel, ()) {
error!("Failed to send Transfer Proof ack: {:?}", error);
}
}
OutEvent::EncryptedSignatureAcknowledged => {
SwarmEvent::Behaviour(OutEvent::EncryptedSignatureAcknowledged) => {
debug!("Alice acknowledged encrypted signature");
}
OutEvent::ResponseSent => {}
OutEvent::CommunicationError(err) => {
SwarmEvent::Behaviour(OutEvent::ResponseSent) => {
}
SwarmEvent::Behaviour(OutEvent::CommunicationError(err)) => {
bail!(err.context("Communication error"))
}
_ => {}
}
},
option = self.dial_alice.recv().fuse() => {