Buffer transfer proof if we are not connected to Bob

The request-response behaviour that is used for sending the transfer
proof actually has a functionality for buffering a message if we
are currently not connected. However, the request-response behaviour
also emits a dial attempt and **drops** all buffered messages if this
dial attempt fails. For us, the dial attempt will very likely always
fail because Bob is very likely behind NAT and we have to wait for
him to reconnect to us.

To mitigate this, we build our own buffer within the EventLoop and
send transfer proofs as soon as we are connected again.

Resolves #348.
This commit is contained in:
Thomas Eizinger 2021-03-22 16:28:59 +11:00
parent cde3f0f74a
commit 638a169a04
No known key found for this signature in database
GPG Key ID: 651AC83A6C6C8B96
5 changed files with 71 additions and 33 deletions

View File

@ -114,7 +114,8 @@ jobs:
matrix:
test_name: [
happy_path,
happy_path_restart_bob_before_comm,
happy_path_restart_bob_after_xmr_locked,
happy_path_restart_bob_before_xmr_locked,
bob_refunds_using_cancel_and_refund_command,
bob_refunds_using_cancel_and_refund_command_timelock_not_expired,
bob_refunds_using_cancel_and_refund_command_timelock_not_expired_force,

View File

@ -233,9 +233,22 @@ impl Behaviour {
}
/// Send Transfer Proof to Bob.
pub fn send_transfer_proof(&mut self, bob: PeerId, msg: transfer_proof::Request) {
///
/// Fails and returns the transfer proof if we are currently not connected
/// to this peer.
pub fn send_transfer_proof(
&mut self,
bob: PeerId,
msg: transfer_proof::Request,
) -> Result<(), transfer_proof::Request> {
if !self.transfer_proof.is_connected(&bob) {
return Err(msg);
}
self.transfer_proof.send_request(&bob, msg);
debug!("Sent Transfer Proof");
debug!("Sending Transfer Proof");
Ok(())
}
pub fn send_encrypted_signature_ack(&mut self, channel: ResponseChannel<()>) {

View File

@ -38,6 +38,10 @@ pub struct EventLoop<RS> {
FuturesUnordered<BoxFuture<'static, Result<(PeerId, transfer_proof::Request)>>>,
swap_sender: mpsc::Sender<Swap>,
/// Tracks [`transfer_proof::Request`]s which could not yet be sent because
/// we are currently disconnected from the peer.
buffered_transfer_proofs: HashMap<PeerId, transfer_proof::Request>,
}
impl<LR> EventLoop<LR>
@ -66,6 +70,7 @@ where
max_buy,
recv_encrypted_signature: Default::default(),
send_transfer_proof: Default::default(),
buffered_transfer_proofs: Default::default(),
};
Ok((event_loop, swap_channel.receiver))
}
@ -152,6 +157,14 @@ where
}
SwarmEvent::ConnectionEstablished { peer_id: peer, endpoint, .. } => {
tracing::debug!(%peer, address = %endpoint.get_remote_address(), "New connection established");
if let Some(transfer_proof) = self.buffered_transfer_proofs.remove(&peer) {
tracing::debug!(%peer, "Found buffered transfer proof for peer");
self.swarm
.send_transfer_proof(peer, transfer_proof)
.expect("must be able to send transfer proof after connection was established");
}
}
SwarmEvent::IncomingConnectionError { send_back_addr: address, error, .. } => {
tracing::warn!(%address, "Failed to set up connection with peer: {}", error);
@ -172,7 +185,12 @@ where
next_transfer_proof = self.send_transfer_proof.next() => {
match next_transfer_proof {
Some(Ok((peer, transfer_proof))) => {
self.swarm.send_transfer_proof(peer, transfer_proof);
let result = self.swarm.send_transfer_proof(peer, transfer_proof);
if let Err(transfer_proof) = result {
tracing::warn!(%peer, "No active connection to peer, buffering transfer proof");
self.buffered_transfer_proofs.insert(peer, transfer_proof);
}
},
Some(Err(_)) => {
tracing::debug!("A swap stopped without sending a transfer proof");

View File

@ -2,7 +2,7 @@ pub mod testutils;
use swap::protocol::bob::BobState;
use swap::protocol::{alice, bob};
use testutils::bob_run_until::{is_btc_locked, is_xmr_locked};
use testutils::bob_run_until::is_xmr_locked;
use testutils::SlowCancelConfig;
#[tokio::test]
@ -32,31 +32,3 @@ async fn given_bob_restarts_after_xmr_is_locked_resume_swap() {
})
.await;
}
#[tokio::test]
async fn given_bob_restarts_before_xmr_is_locked_resume_swap() {
testutils::setup_test(SlowCancelConfig, |mut ctx| async move {
let (bob_swap, bob_join_handle) = ctx.bob_swap().await;
let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked));
let alice_swap = ctx.alice_next_swap().await;
let alice_swap = tokio::spawn(alice::run(alice_swap));
let bob_state = bob_swap.await??;
assert!(matches!(bob_state, BobState::BtcLocked { .. }));
let (bob_swap, _) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await;
assert!(matches!(bob_swap.state, BobState::BtcLocked { .. }));
let bob_state = bob::run(bob_swap).await?;
ctx.assert_bob_redeemed(bob_state).await;
let alice_state = alice_swap.await??;
ctx.assert_alice_redeemed(alice_state).await;
Ok(())
})
.await;
}

View File

@ -0,0 +1,34 @@
pub mod testutils;
use swap::protocol::bob::BobState;
use swap::protocol::{alice, bob};
use testutils::bob_run_until::is_xmr_locked;
use testutils::SlowCancelConfig;
#[tokio::test]
async fn given_bob_restarts_after_xmr_is_locked_resume_swap() {
testutils::setup_test(SlowCancelConfig, |mut ctx| async move {
let (bob_swap, bob_join_handle) = ctx.bob_swap().await;
let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_xmr_locked));
let alice_swap = ctx.alice_next_swap().await;
let alice_swap = tokio::spawn(alice::run(alice_swap));
let bob_state = bob_swap.await??;
assert!(matches!(bob_state, BobState::XmrLocked { .. }));
let (bob_swap, _) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await;
assert!(matches!(bob_swap.state, BobState::XmrLocked { .. }));
let bob_state = bob::run(bob_swap).await?;
ctx.assert_bob_redeemed(bob_state).await;
let alice_state = alice_swap.await??;
ctx.assert_alice_redeemed(alice_state).await;
Ok(())
})
.await;
}