Timeout for Alice waiting for ack for sending transfer proof

If dialing Bob fails Alice waits for the acknowledgement of the transfer proof indefinitely.
The timout prevents her execution from hanging.
Added a ToDo to re-visit the ack receivers. They don't add value at the moment and should be removed.
This commit is contained in:
Daniel Karzel 2021-02-01 18:14:53 +11:00
parent c9adbde5d5
commit 2d5d70d856
4 changed files with 36 additions and 14 deletions

View File

@ -1,4 +1,5 @@
use crate::{ use crate::{
execution_params::ExecutionParams,
network::{transport::SwapTransport, TokioExecutor}, network::{transport::SwapTransport, TokioExecutor},
protocol::{ protocol::{
alice::{Behaviour, OutEvent, State0, State3, SwapResponse, TransferProof}, alice::{Behaviour, OutEvent, State0, State3, SwapResponse, TransferProof},
@ -9,7 +10,10 @@ use anyhow::{anyhow, Context, Result};
use libp2p::{ use libp2p::{
core::Multiaddr, futures::FutureExt, request_response::ResponseChannel, PeerId, Swarm, core::Multiaddr, futures::FutureExt, request_response::ResponseChannel, PeerId, Swarm,
}; };
use tokio::sync::mpsc::{Receiver, Sender}; use tokio::{
sync::mpsc::{Receiver, Sender},
time::timeout,
};
use tracing::{error, trace}; use tracing::{error, trace};
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
@ -91,13 +95,27 @@ impl EventLoopHandle {
Ok(()) Ok(())
} }
pub async fn send_transfer_proof(&mut self, bob: PeerId, msg: TransferProof) -> Result<()> { pub async fn send_transfer_proof(
&mut self,
bob: PeerId,
msg: TransferProof,
execution_params: ExecutionParams,
) -> Result<()> {
let _ = self.send_transfer_proof.send((bob, msg)).await?; let _ = self.send_transfer_proof.send((bob, msg)).await?;
self.recv_transfer_proof_ack // TODO: Re-evaluate if these acknowledges are necessary at all.
.recv() // If we don't use a timeout here and Alice fails to dial Bob she will wait
// indefinitely for this acknowledge.
if timeout(
execution_params.bob_time_to_act,
self.recv_transfer_proof_ack.recv(),
)
.await .await
.ok_or_else(|| anyhow!("Failed to receive transfer proof ack from Bob"))?; .is_err()
{
error!("Failed to receive transfer proof ack from Bob")
}
Ok(()) Ok(())
} }
} }

View File

@ -96,6 +96,7 @@ pub async fn lock_xmr<W>(
state3: alice::State3, state3: alice::State3,
event_loop_handle: &mut EventLoopHandle, event_loop_handle: &mut EventLoopHandle,
monero_wallet: Arc<W>, monero_wallet: Arc<W>,
execution_params: ExecutionParams,
) -> Result<()> ) -> Result<()>
where where
W: Transfer, W: Transfer,
@ -117,9 +118,13 @@ where
// Otherwise Alice might publish the lock tx twice! // Otherwise Alice might publish the lock tx twice!
event_loop_handle event_loop_handle
.send_transfer_proof(bob_peer_id, TransferProof { .send_transfer_proof(
bob_peer_id,
TransferProof {
tx_lock_proof: transfer_proof, tx_lock_proof: transfer_proof,
}) },
execution_params,
)
.await?; .await?;
Ok(()) Ok(())

View File

@ -165,6 +165,7 @@ async fn run_until_internal(
*state3.clone(), *state3.clone(),
&mut event_loop_handle, &mut event_loop_handle,
monero_wallet.clone(), monero_wallet.clone(),
execution_params,
) )
.await?; .await?;

View File

@ -10,7 +10,7 @@ async fn given_bob_manually_refunds_after_btc_locked_bob_refunds() {
let (bob_swap, bob_join_handle) = ctx.new_swap_as_bob().await; let (bob_swap, bob_join_handle) = ctx.new_swap_as_bob().await;
let alice_handle = alice::run(alice_swap); let alice_handle = alice::run(alice_swap);
tokio::spawn(alice_handle); let alice_swap_handle = tokio::spawn(alice_handle);
let bob_state = bob::run_until(bob_swap, is_btc_locked).await.unwrap(); let bob_state = bob::run_until(bob_swap, is_btc_locked).await.unwrap();
@ -57,10 +57,8 @@ async fn given_bob_manually_refunds_after_btc_locked_bob_refunds() {
ctx.assert_bob_refunded(bob_state).await; ctx.assert_bob_refunded(bob_state).await;
// TODO: Alice hangs indefinitely waiting for Blob's acknowledge on let alice_state = alice_swap_handle.await.unwrap().unwrap();
// sending the transfer proof ctx.assert_alice_refunded(alice_state).await;
// let alice_state = alice_swap_handle.await.unwrap().unwrap();
// ctx.assert_alice_refunded(alice_state).await;
}) })
.await; .await;
} }