diff --git a/swap/src/alice.rs b/swap/src/alice.rs index 1b7a2505..7149e06f 100644 --- a/swap/src/alice.rs +++ b/swap/src/alice.rs @@ -2,6 +2,7 @@ //! Alice holds XMR and wishes receive BTC. use anyhow::Result; use async_trait::async_trait; +use backoff::{future::FutureOperation as _, ExponentialBackoff}; use genawaiter::GeneratorState; use libp2p::{ core::{identity::Keypair, Multiaddr}, @@ -50,17 +51,20 @@ pub async fn swap( punish_address: ::bitcoin::Address, ) -> Result<()> { struct Network { - swarm: Swarm, + swarm: Arc>, channel: Option>, } impl Network { - pub fn send_message2(&mut self, proof: monero::TransferProof) { + pub async fn send_message2(&mut self, proof: monero::TransferProof) { match self.channel.take() { None => warn!("Channel not found, did you call this twice?"), - Some(channel) => self.swarm.send_message2(channel, alice::Message2 { - tx_lock_proof: proof, - }), + Some(channel) => { + let mut guard = self.swarm.lock().await; + guard.send_message2(channel, alice::Message2 { + tx_lock_proof: proof, + }) + } } } } @@ -68,7 +72,27 @@ pub async fn swap( #[async_trait] impl ReceiveBitcoinRedeemEncsig for Network { async fn receive_bitcoin_redeem_encsig(&mut self) -> xmr_btc::bitcoin::EncryptedSignature { - todo!() + #[derive(Debug)] + struct UnexpectedMessage; + + (|| async { + let mut guard = self.swarm.lock().await; + let encsig = match guard.next().await { + OutEvent::Message3(msg) => msg.tx_redeem_encsig, + other => { + warn!("Expected Bob's Message3, got: {:?}", other); + return Err(backoff::Error::Transient(UnexpectedMessage)); + } + }; + + Result::<_, backoff::Error>::Ok(encsig) + }) + .retry(ExponentialBackoff { + max_elapsed_time: None, + ..Default::default() + }) + .await + .expect("transient errors to be retried") } } @@ -144,7 +168,7 @@ pub async fn swap( info!("Handshake complete, we now have State3 for Alice."); let network = Arc::new(Mutex::new(Network { - swarm, + swarm: Arc::new(Mutex::new(swarm)), channel: Some(channel), })); @@ -171,7 +195,7 @@ pub async fn swap( .await?; let mut guard = network.as_ref().lock().await; - guard.send_message2(transfer_proof); + guard.send_message2(transfer_proof).await; } GeneratorState::Yielded(Action::RedeemBtc(tx)) => { diff --git a/swap/src/bob.rs b/swap/src/bob.rs index 97327448..4194a7bd 100644 --- a/swap/src/bob.rs +++ b/swap/src/bob.rs @@ -2,16 +2,17 @@ //! Bob holds BTC and wishes receive XMR. use anyhow::Result; use async_trait::async_trait; +use backoff::{future::FutureOperation as _, ExponentialBackoff}; use futures::{ channel::mpsc::{Receiver, Sender}, - StreamExt, + FutureExt, StreamExt, }; use genawaiter::GeneratorState; use libp2p::{core::identity::Keypair, Multiaddr, NetworkBehaviour, PeerId}; use rand::rngs::OsRng; use std::{process, sync::Arc}; use tokio::sync::Mutex; -use tracing::{debug, info}; +use tracing::{debug, info, warn}; mod amounts; mod message0; @@ -52,7 +53,28 @@ pub async fn swap( #[async_trait] impl ReceiveTransferProof for Network { async fn receive_transfer_proof(&mut self) -> monero::TransferProof { - todo!() + #[derive(Debug)] + struct UnexpectedMessage; + + let future = self.0.next().shared(); + + (|| async { + let proof = match future.clone().await { + OutEvent::Message2(msg) => msg.tx_lock_proof, + other => { + warn!("Expected Alice's Message2, got: {:?}", other); + return Err(backoff::Error::Transient(UnexpectedMessage)); + } + }; + + Result::<_, backoff::Error>::Ok(proof) + }) + .retry(ExponentialBackoff { + max_elapsed_time: None, + ..Default::default() + }) + .await + .expect("transient errors to be retried") } } @@ -193,7 +215,7 @@ fn new_swarm() -> Result { } #[allow(clippy::large_enum_variant)] -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum OutEvent { ConnectionEstablished(PeerId), Amounts(SwapAmounts), diff --git a/xmr-btc/tests/on_chain.rs b/xmr-btc/tests/on_chain.rs index 0d897b7a..89eaf1f1 100644 --- a/xmr-btc/tests/on_chain.rs +++ b/xmr-btc/tests/on_chain.rs @@ -168,7 +168,7 @@ async fn swap_as_alice( } async fn swap_as_bob( - network: BobNetwork, + network: Arc>, mut sender: Sender, monero_wallet: Arc, bitcoin_wallet: Arc, @@ -274,11 +274,9 @@ async fn on_chain_happy_path() { let (alice_network, bob_sender) = Network::::new(); let (bob_network, alice_sender) = Network::::new(); - let alice_network = Arc::new(Mutex::new(alice_network)); - try_join( swap_as_alice( - alice_network, + Arc::new(Mutex::new(alice_network)), alice_sender, alice_monero_wallet.clone(), alice_bitcoin_wallet.clone(), @@ -286,7 +284,7 @@ async fn on_chain_happy_path() { alice, ), swap_as_bob( - bob_network, + Arc::new(Mutex::new(bob_network)), bob_sender, bob_monero_wallet.clone(), bob_bitcoin_wallet.clone(), @@ -367,11 +365,9 @@ async fn on_chain_both_refund_if_alice_never_redeems() { let (alice_network, bob_sender) = Network::::new(); let (bob_network, alice_sender) = Network::::new(); - let alice_network = Arc::new(Mutex::new(alice_network)); - try_join( swap_as_alice( - alice_network, + Arc::new(Mutex::new(alice_network)), alice_sender, alice_monero_wallet.clone(), alice_bitcoin_wallet.clone(), @@ -382,7 +378,7 @@ async fn on_chain_both_refund_if_alice_never_redeems() { alice, ), swap_as_bob( - bob_network, + Arc::new(Mutex::new(bob_network)), bob_sender, bob_monero_wallet.clone(), bob_bitcoin_wallet.clone(), @@ -464,10 +460,8 @@ async fn on_chain_alice_punishes_if_bob_never_acts_after_fund() { let (alice_network, bob_sender) = Network::::new(); let (bob_network, alice_sender) = Network::::new(); - let alice_network = Arc::new(Mutex::new(alice_network)); - let alice_swap = swap_as_alice( - alice_network, + Arc::new(Mutex::new(alice_network)), alice_sender, alice_monero_wallet.clone(), alice_bitcoin_wallet.clone(), @@ -475,7 +469,7 @@ async fn on_chain_alice_punishes_if_bob_never_acts_after_fund() { alice, ); let bob_swap = swap_as_bob( - bob_network, + Arc::new(Mutex::new(bob_network)), bob_sender, bob_monero_wallet.clone(), bob_bitcoin_wallet.clone(),