Use bmrng to model communicaton of Alice's EventLoop with the handle

This allows us to delay the ACKing of the encrypted signature up until
the swap has actually requested it.

Similarly, it allows us to wait for the ACK of the transfer proof within
the swap before continuing.
This commit is contained in:
Thomas Eizinger 2021-03-30 16:42:54 +11:00
parent 1c47b32681
commit 1b0c29b424
No known key found for this signature in database
GPG Key ID: 651AC83A6C6C8B96
2 changed files with 97 additions and 47 deletions

View File

@ -2,7 +2,9 @@ use crate::network::quote::BidQuote;
use crate::network::{encrypted_signature, quote, spot_price, transfer_proof}; use crate::network::{encrypted_signature, quote, spot_price, transfer_proof};
use crate::protocol::alice::{execution_setup, State3}; use crate::protocol::alice::{execution_setup, State3};
use anyhow::{anyhow, Error}; use anyhow::{anyhow, Error};
use libp2p::request_response::{RequestResponseEvent, RequestResponseMessage, ResponseChannel}; use libp2p::request_response::{
RequestId, RequestResponseEvent, RequestResponseMessage, ResponseChannel,
};
use libp2p::{NetworkBehaviour, PeerId}; use libp2p::{NetworkBehaviour, PeerId};
#[derive(Debug)] #[derive(Debug)]
@ -20,7 +22,10 @@ pub enum OutEvent {
bob_peer_id: PeerId, bob_peer_id: PeerId,
state3: Box<State3>, state3: Box<State3>,
}, },
TransferProofAcknowledged(PeerId), TransferProofAcknowledged {
peer: PeerId,
id: RequestId,
},
EncryptedSignatureReceived { EncryptedSignatureReceived {
msg: Box<encrypted_signature::Request>, msg: Box<encrypted_signature::Request>,
channel: ResponseChannel<()>, channel: ResponseChannel<()>,
@ -77,7 +82,12 @@ impl From<(PeerId, transfer_proof::Message)> for OutEvent {
fn from((peer, message): (PeerId, transfer_proof::Message)) -> Self { fn from((peer, message): (PeerId, transfer_proof::Message)) -> Self {
match message { match message {
transfer_proof::Message::Request { .. } => OutEvent::unexpected_request(peer), transfer_proof::Message::Request { .. } => OutEvent::unexpected_request(peer),
transfer_proof::Message::Response { .. } => OutEvent::TransferProofAcknowledged(peer), transfer_proof::Message::Response { request_id, .. } => {
OutEvent::TransferProofAcknowledged {
peer,
id: request_id,
}
}
} }
} }
} }

View File

@ -10,15 +10,26 @@ use anyhow::{bail, Context, Result};
use futures::future; use futures::future;
use futures::future::{BoxFuture, FutureExt}; use futures::future::{BoxFuture, FutureExt};
use futures::stream::{FuturesUnordered, StreamExt}; use futures::stream::{FuturesUnordered, StreamExt};
use libp2p::request_response::{RequestId, ResponseChannel};
use libp2p::swarm::SwarmEvent; use libp2p::swarm::SwarmEvent;
use libp2p::{PeerId, Swarm}; use libp2p::{PeerId, Swarm};
use rand::rngs::OsRng; use rand::rngs::OsRng;
use std::collections::HashMap; use std::collections::HashMap;
use std::convert::Infallible; use std::convert::Infallible;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::mpsc;
use uuid::Uuid; use uuid::Uuid;
/// A future that resolves to a tuple of `PeerId`, `transfer_proof::Request` and
/// `Responder`.
///
/// When this future resolves, the `transfer_proof::Request` shall be sent to
/// the peer identified by the `PeerId`. Once the request has been acknowledged
/// by the peer, i.e. a `()` response has been received, the `Responder` shall
/// be used to let the original sender know about the successful transfer.
type OutgoingTransferProof =
BoxFuture<'static, Result<(PeerId, transfer_proof::Request, bmrng::Responder<()>)>>;
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct EventLoop<RS> { pub struct EventLoop<RS> {
swarm: libp2p::Swarm<Behaviour>, swarm: libp2p::Swarm<Behaviour>,
@ -29,18 +40,22 @@ pub struct EventLoop<RS> {
latest_rate: RS, latest_rate: RS,
max_buy: bitcoin::Amount, max_buy: bitcoin::Amount,
/// Stores a sender per peer for incoming [`EncryptedSignature`]s.
recv_encrypted_signature: HashMap<PeerId, oneshot::Sender<encrypted_signature::Request>>,
/// Stores a list of futures, waiting for transfer proof which will be sent
/// to the given peer.
send_transfer_proof:
FuturesUnordered<BoxFuture<'static, Result<(PeerId, transfer_proof::Request)>>>,
swap_sender: mpsc::Sender<Swap>, swap_sender: mpsc::Sender<Swap>,
/// Stores a sender per peer for incoming [`EncryptedSignature`]s.
recv_encrypted_signature:
HashMap<PeerId, bmrng::RequestSender<encrypted_signature::Request, ()>>,
inflight_encrypted_signatures: FuturesUnordered<BoxFuture<'static, ResponseChannel<()>>>,
send_transfer_proof: FuturesUnordered<OutgoingTransferProof>,
/// Tracks [`transfer_proof::Request`]s which could not yet be sent because /// Tracks [`transfer_proof::Request`]s which could not yet be sent because
/// we are currently disconnected from the peer. /// we are currently disconnected from the peer.
buffered_transfer_proofs: HashMap<PeerId, transfer_proof::Request>, buffered_transfer_proofs: HashMap<PeerId, (transfer_proof::Request, bmrng::Responder<()>)>,
/// Tracks [`transfer_proof::Request`]s which are currently inflight and
/// awaiting an acknowledgement.
inflight_transfer_proofs: HashMap<RequestId, bmrng::Responder<()>>,
} }
impl<LR> EventLoop<LR> impl<LR> EventLoop<LR>
@ -68,8 +83,10 @@ where
swap_sender: swap_channel.sender, swap_sender: swap_channel.sender,
max_buy, max_buy,
recv_encrypted_signature: Default::default(), recv_encrypted_signature: Default::default(),
inflight_encrypted_signatures: Default::default(),
send_transfer_proof: Default::default(), send_transfer_proof: Default::default(),
buffered_transfer_proofs: Default::default(), buffered_transfer_proofs: Default::default(),
inflight_transfer_proofs: Default::default(),
}; };
Ok((event_loop, swap_channel.receiver)) Ok((event_loop, swap_channel.receiver))
} }
@ -79,9 +96,11 @@ where
} }
pub async fn run(mut self) { pub async fn run(mut self) {
// ensure that the send_transfer_proof stream is NEVER empty, otherwise it will // ensure that these streams are NEVER empty, otherwise it will
// terminate forever. // terminate forever.
self.send_transfer_proof.push(future::pending().boxed()); self.send_transfer_proof.push(future::pending().boxed());
self.inflight_encrypted_signatures
.push(future::pending().boxed());
let unfinished_swaps = match self.db.unfinished_alice() { let unfinished_swaps = match self.db.unfinished_alice() {
Ok(unfinished_swaps) => unfinished_swaps, Ok(unfinished_swaps) => unfinished_swaps,
@ -169,21 +188,34 @@ where
SwarmEvent::Behaviour(OutEvent::ExecutionSetupDone{bob_peer_id, state3}) => { SwarmEvent::Behaviour(OutEvent::ExecutionSetupDone{bob_peer_id, state3}) => {
let _ = self.handle_execution_setup_done(bob_peer_id, *state3).await; let _ = self.handle_execution_setup_done(bob_peer_id, *state3).await;
} }
SwarmEvent::Behaviour(OutEvent::TransferProofAcknowledged(peer)) => { SwarmEvent::Behaviour(OutEvent::TransferProofAcknowledged { peer, id }) => {
tracing::trace!(%peer, "Bob acknowledged transfer proof"); tracing::trace!(%peer, "Bob acknowledged transfer proof");
if let Some(responder) = self.inflight_transfer_proofs.remove(&id) {
let _ = responder.respond(());
}
} }
SwarmEvent::Behaviour(OutEvent::EncryptedSignatureReceived{ msg, channel, peer }) => { SwarmEvent::Behaviour(OutEvent::EncryptedSignatureReceived{ msg, channel, peer }) => {
match self.recv_encrypted_signature.remove(&peer) { let sender = match self.recv_encrypted_signature.remove(&peer) {
Some(sender) => { Some(sender) => sender,
// this failing just means the receiver is no longer interested ...
let _ = sender.send(*msg);
},
None => { None => {
tracing::warn!(%peer, "No sender for encrypted signature, maybe already handled?") tracing::warn!(%peer, "No sender for encrypted signature, maybe already handled?");
continue;
} }
} };
let _ = self.swarm.encrypted_signature.send_response(channel, ()); let mut responder = match sender.send(*msg).await {
Ok(responder) => responder,
Err(_) => {
tracing::warn!(%peer, "Failed to relay encrypted signature to swap");
continue;
}
};
self.inflight_encrypted_signatures.push(async move {
let _ = responder.recv().await;
channel
}.boxed());
} }
SwarmEvent::Behaviour(OutEvent::ResponseSent) => {} SwarmEvent::Behaviour(OutEvent::ResponseSent) => {}
SwarmEvent::Behaviour(OutEvent::Failure {peer, error}) => { SwarmEvent::Behaviour(OutEvent::Failure {peer, error}) => {
@ -192,10 +224,11 @@ where
SwarmEvent::ConnectionEstablished { peer_id: peer, endpoint, .. } => { SwarmEvent::ConnectionEstablished { peer_id: peer, endpoint, .. } => {
tracing::debug!(%peer, address = %endpoint.get_remote_address(), "New connection established"); tracing::debug!(%peer, address = %endpoint.get_remote_address(), "New connection established");
if let Some(transfer_proof) = self.buffered_transfer_proofs.remove(&peer) { if let Some((transfer_proof, responder)) = self.buffered_transfer_proofs.remove(&peer) {
tracing::debug!(%peer, "Found buffered transfer proof for peer"); tracing::debug!(%peer, "Found buffered transfer proof for peer");
self.swarm.transfer_proof.send_request(&peer, transfer_proof); let id = self.swarm.transfer_proof.send_request(&peer, transfer_proof);
self.inflight_transfer_proofs.insert(id, responder);
} }
} }
SwarmEvent::IncomingConnectionError { send_back_addr: address, error, .. } => { SwarmEvent::IncomingConnectionError { send_back_addr: address, error, .. } => {
@ -216,14 +249,15 @@ where
}, },
next_transfer_proof = self.send_transfer_proof.next() => { next_transfer_proof = self.send_transfer_proof.next() => {
match next_transfer_proof { match next_transfer_proof {
Some(Ok((peer, transfer_proof))) => { Some(Ok((peer, transfer_proof, responder))) => {
if !self.swarm.transfer_proof.is_connected(&peer) { if !self.swarm.transfer_proof.is_connected(&peer) {
tracing::warn!(%peer, "No active connection to peer, buffering transfer proof"); tracing::warn!(%peer, "No active connection to peer, buffering transfer proof");
self.buffered_transfer_proofs.insert(peer, transfer_proof); self.buffered_transfer_proofs.insert(peer, (transfer_proof, responder));
continue; continue;
} }
self.swarm.transfer_proof.send_request(&peer, transfer_proof); let id = self.swarm.transfer_proof.send_request(&peer, transfer_proof);
self.inflight_transfer_proofs.insert(id, responder);
}, },
Some(Err(_)) => { Some(Err(_)) => {
tracing::debug!("A swap stopped without sending a transfer proof"); tracing::debug!("A swap stopped without sending a transfer proof");
@ -233,6 +267,9 @@ where
} }
} }
} }
Some(response_channel) = self.inflight_encrypted_signatures.next() => {
let _ = self.swarm.encrypted_signature.send_response(response_channel, ());
}
} }
} }
} }
@ -315,23 +352,25 @@ where
/// Create a new [`EventLoopHandle`] that is scoped for communication with /// Create a new [`EventLoopHandle`] that is scoped for communication with
/// the given peer. /// the given peer.
fn new_handle(&mut self, peer: PeerId) -> EventLoopHandle { fn new_handle(&mut self, peer: PeerId) -> EventLoopHandle {
let (send_transfer_proof_sender, send_transfer_proof_receiver) = oneshot::channel(); // we deliberately don't put timeouts on these channels because the swap always
let (recv_enc_sig_sender, recv_enc_sig_receiver) = oneshot::channel(); // races these futures against a timelock
let (transfer_proof_sender, mut transfer_proof_receiver) = bmrng::channel(1);
let encrypted_signature = bmrng::channel(1);
self.recv_encrypted_signature self.recv_encrypted_signature
.insert(peer, recv_enc_sig_sender); .insert(peer, encrypted_signature.0);
self.send_transfer_proof.push( self.send_transfer_proof.push(
async move { async move {
let transfer_proof = send_transfer_proof_receiver.await?; let (request, responder) = transfer_proof_receiver.recv().await?;
Ok((peer, transfer_proof)) Ok((peer, request, responder))
} }
.boxed(), .boxed(),
); );
EventLoopHandle { EventLoopHandle {
recv_encrypted_signature: Some(recv_enc_sig_receiver), recv_encrypted_signature: Some(encrypted_signature.1),
send_transfer_proof: Some(send_transfer_proof_sender), send_transfer_proof: Some(transfer_proof_sender),
} }
} }
} }
@ -360,32 +399,33 @@ impl LatestRate for kraken::RateUpdateStream {
#[derive(Debug)] #[derive(Debug)]
pub struct EventLoopHandle { pub struct EventLoopHandle {
recv_encrypted_signature: Option<oneshot::Receiver<encrypted_signature::Request>>, recv_encrypted_signature: Option<bmrng::RequestReceiver<encrypted_signature::Request, ()>>,
send_transfer_proof: Option<oneshot::Sender<transfer_proof::Request>>, send_transfer_proof: Option<bmrng::RequestSender<transfer_proof::Request, ()>>,
} }
impl EventLoopHandle { impl EventLoopHandle {
pub async fn recv_encrypted_signature(&mut self) -> Result<bitcoin::EncryptedSignature> { pub async fn recv_encrypted_signature(&mut self) -> Result<bitcoin::EncryptedSignature> {
let signature = self let (request, responder) = self
.recv_encrypted_signature .recv_encrypted_signature
.take() .take()
.context("Encrypted signature was already received")? .context("Encrypted signature was already received")?
.await? .recv()
.tx_redeem_encsig; .await?;
Ok(signature) responder
.respond(())
.context("Failed to acknowledge receipt of encrypted signature")?;
Ok(request.tx_redeem_encsig)
} }
pub async fn send_transfer_proof(&mut self, msg: monero::TransferProof) -> Result<()> { pub async fn send_transfer_proof(&mut self, msg: monero::TransferProof) -> Result<()> {
if self self.send_transfer_proof
.send_transfer_proof
.take() .take()
.context("Transfer proof was already sent")? .context("Transfer proof was already sent")?
.send(transfer_proof::Request { tx_lock_proof: msg }) .send_receive(transfer_proof::Request { tx_lock_proof: msg })
.is_err() .await
{ .context("Failed to send transfer proof")?;
bail!("Failed to send transfer proof, receiver no longer listening?")
}
Ok(()) Ok(())
} }