Implement traits Receive{BitcoinRedeemEncsig, TransferProof}

Unfortunately, I had to put the wrap the swarm in Alice's `Network`
struct in an `Arc<Mutex<T>>` in order to be able to use `backoff` to
control the retry mechanism. This is because the stream of events
cannot be turned into a `SharedFuture` (unlike Bob's).

It would be good to find an alternative solution.
This commit is contained in:
Lucas Soriano del Pino 2020-10-27 17:18:19 +11:00
parent a4e4c27bee
commit 3f43581da7
3 changed files with 65 additions and 25 deletions

View File

@ -2,6 +2,7 @@
//! Alice holds XMR and wishes receive BTC. //! Alice holds XMR and wishes receive BTC.
use anyhow::Result; use anyhow::Result;
use async_trait::async_trait; use async_trait::async_trait;
use backoff::{future::FutureOperation as _, ExponentialBackoff};
use genawaiter::GeneratorState; use genawaiter::GeneratorState;
use libp2p::{ use libp2p::{
core::{identity::Keypair, Multiaddr}, core::{identity::Keypair, Multiaddr},
@ -50,17 +51,20 @@ pub async fn swap(
punish_address: ::bitcoin::Address, punish_address: ::bitcoin::Address,
) -> Result<()> { ) -> Result<()> {
struct Network { struct Network {
swarm: Swarm, swarm: Arc<Mutex<Swarm>>,
channel: Option<ResponseChannel<AliceToBob>>, channel: Option<ResponseChannel<AliceToBob>>,
} }
impl Network { impl Network {
pub fn send_message2(&mut self, proof: monero::TransferProof) { pub async fn send_message2(&mut self, proof: monero::TransferProof) {
match self.channel.take() { match self.channel.take() {
None => warn!("Channel not found, did you call this twice?"), None => warn!("Channel not found, did you call this twice?"),
Some(channel) => self.swarm.send_message2(channel, alice::Message2 { Some(channel) => {
let mut guard = self.swarm.lock().await;
guard.send_message2(channel, alice::Message2 {
tx_lock_proof: proof, tx_lock_proof: proof,
}), })
}
} }
} }
} }
@ -68,7 +72,27 @@ pub async fn swap(
#[async_trait] #[async_trait]
impl ReceiveBitcoinRedeemEncsig for Network { impl ReceiveBitcoinRedeemEncsig for Network {
async fn receive_bitcoin_redeem_encsig(&mut self) -> xmr_btc::bitcoin::EncryptedSignature { async fn receive_bitcoin_redeem_encsig(&mut self) -> xmr_btc::bitcoin::EncryptedSignature {
todo!() #[derive(Debug)]
struct UnexpectedMessage;
(|| async {
let mut guard = self.swarm.lock().await;
let encsig = match guard.next().await {
OutEvent::Message3(msg) => msg.tx_redeem_encsig,
other => {
warn!("Expected Bob's Message3, got: {:?}", other);
return Err(backoff::Error::Transient(UnexpectedMessage));
}
};
Result::<_, backoff::Error<UnexpectedMessage>>::Ok(encsig)
})
.retry(ExponentialBackoff {
max_elapsed_time: None,
..Default::default()
})
.await
.expect("transient errors to be retried")
} }
} }
@ -144,7 +168,7 @@ pub async fn swap(
info!("Handshake complete, we now have State3 for Alice."); info!("Handshake complete, we now have State3 for Alice.");
let network = Arc::new(Mutex::new(Network { let network = Arc::new(Mutex::new(Network {
swarm, swarm: Arc::new(Mutex::new(swarm)),
channel: Some(channel), channel: Some(channel),
})); }));
@ -171,7 +195,7 @@ pub async fn swap(
.await?; .await?;
let mut guard = network.as_ref().lock().await; let mut guard = network.as_ref().lock().await;
guard.send_message2(transfer_proof); guard.send_message2(transfer_proof).await;
} }
GeneratorState::Yielded(Action::RedeemBtc(tx)) => { GeneratorState::Yielded(Action::RedeemBtc(tx)) => {

View File

@ -2,16 +2,17 @@
//! Bob holds BTC and wishes receive XMR. //! Bob holds BTC and wishes receive XMR.
use anyhow::Result; use anyhow::Result;
use async_trait::async_trait; use async_trait::async_trait;
use backoff::{future::FutureOperation as _, ExponentialBackoff};
use futures::{ use futures::{
channel::mpsc::{Receiver, Sender}, channel::mpsc::{Receiver, Sender},
StreamExt, FutureExt, StreamExt,
}; };
use genawaiter::GeneratorState; use genawaiter::GeneratorState;
use libp2p::{core::identity::Keypair, Multiaddr, NetworkBehaviour, PeerId}; use libp2p::{core::identity::Keypair, Multiaddr, NetworkBehaviour, PeerId};
use rand::rngs::OsRng; use rand::rngs::OsRng;
use std::{process, sync::Arc}; use std::{process, sync::Arc};
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tracing::{debug, info}; use tracing::{debug, info, warn};
mod amounts; mod amounts;
mod message0; mod message0;
@ -52,7 +53,28 @@ pub async fn swap(
#[async_trait] #[async_trait]
impl ReceiveTransferProof for Network { impl ReceiveTransferProof for Network {
async fn receive_transfer_proof(&mut self) -> monero::TransferProof { async fn receive_transfer_proof(&mut self) -> monero::TransferProof {
todo!() #[derive(Debug)]
struct UnexpectedMessage;
let future = self.0.next().shared();
(|| async {
let proof = match future.clone().await {
OutEvent::Message2(msg) => msg.tx_lock_proof,
other => {
warn!("Expected Alice's Message2, got: {:?}", other);
return Err(backoff::Error::Transient(UnexpectedMessage));
}
};
Result::<_, backoff::Error<UnexpectedMessage>>::Ok(proof)
})
.retry(ExponentialBackoff {
max_elapsed_time: None,
..Default::default()
})
.await
.expect("transient errors to be retried")
} }
} }
@ -193,7 +215,7 @@ fn new_swarm() -> Result<Swarm> {
} }
#[allow(clippy::large_enum_variant)] #[allow(clippy::large_enum_variant)]
#[derive(Debug)] #[derive(Debug, Clone)]
pub enum OutEvent { pub enum OutEvent {
ConnectionEstablished(PeerId), ConnectionEstablished(PeerId),
Amounts(SwapAmounts), Amounts(SwapAmounts),

View File

@ -168,7 +168,7 @@ async fn swap_as_alice(
} }
async fn swap_as_bob( async fn swap_as_bob(
network: BobNetwork, network: Arc<Mutex<BobNetwork>>,
mut sender: Sender<EncryptedSignature>, mut sender: Sender<EncryptedSignature>,
monero_wallet: Arc<harness::wallet::monero::Wallet>, monero_wallet: Arc<harness::wallet::monero::Wallet>,
bitcoin_wallet: Arc<harness::wallet::bitcoin::Wallet>, bitcoin_wallet: Arc<harness::wallet::bitcoin::Wallet>,
@ -274,11 +274,9 @@ async fn on_chain_happy_path() {
let (alice_network, bob_sender) = Network::<EncryptedSignature>::new(); let (alice_network, bob_sender) = Network::<EncryptedSignature>::new();
let (bob_network, alice_sender) = Network::<TransferProof>::new(); let (bob_network, alice_sender) = Network::<TransferProof>::new();
let alice_network = Arc::new(Mutex::new(alice_network));
try_join( try_join(
swap_as_alice( swap_as_alice(
alice_network, Arc::new(Mutex::new(alice_network)),
alice_sender, alice_sender,
alice_monero_wallet.clone(), alice_monero_wallet.clone(),
alice_bitcoin_wallet.clone(), alice_bitcoin_wallet.clone(),
@ -286,7 +284,7 @@ async fn on_chain_happy_path() {
alice, alice,
), ),
swap_as_bob( swap_as_bob(
bob_network, Arc::new(Mutex::new(bob_network)),
bob_sender, bob_sender,
bob_monero_wallet.clone(), bob_monero_wallet.clone(),
bob_bitcoin_wallet.clone(), bob_bitcoin_wallet.clone(),
@ -367,11 +365,9 @@ async fn on_chain_both_refund_if_alice_never_redeems() {
let (alice_network, bob_sender) = Network::<EncryptedSignature>::new(); let (alice_network, bob_sender) = Network::<EncryptedSignature>::new();
let (bob_network, alice_sender) = Network::<TransferProof>::new(); let (bob_network, alice_sender) = Network::<TransferProof>::new();
let alice_network = Arc::new(Mutex::new(alice_network));
try_join( try_join(
swap_as_alice( swap_as_alice(
alice_network, Arc::new(Mutex::new(alice_network)),
alice_sender, alice_sender,
alice_monero_wallet.clone(), alice_monero_wallet.clone(),
alice_bitcoin_wallet.clone(), alice_bitcoin_wallet.clone(),
@ -382,7 +378,7 @@ async fn on_chain_both_refund_if_alice_never_redeems() {
alice, alice,
), ),
swap_as_bob( swap_as_bob(
bob_network, Arc::new(Mutex::new(bob_network)),
bob_sender, bob_sender,
bob_monero_wallet.clone(), bob_monero_wallet.clone(),
bob_bitcoin_wallet.clone(), bob_bitcoin_wallet.clone(),
@ -464,10 +460,8 @@ async fn on_chain_alice_punishes_if_bob_never_acts_after_fund() {
let (alice_network, bob_sender) = Network::<EncryptedSignature>::new(); let (alice_network, bob_sender) = Network::<EncryptedSignature>::new();
let (bob_network, alice_sender) = Network::<TransferProof>::new(); let (bob_network, alice_sender) = Network::<TransferProof>::new();
let alice_network = Arc::new(Mutex::new(alice_network));
let alice_swap = swap_as_alice( let alice_swap = swap_as_alice(
alice_network, Arc::new(Mutex::new(alice_network)),
alice_sender, alice_sender,
alice_monero_wallet.clone(), alice_monero_wallet.clone(),
alice_bitcoin_wallet.clone(), alice_bitcoin_wallet.clone(),
@ -475,7 +469,7 @@ async fn on_chain_alice_punishes_if_bob_never_acts_after_fund() {
alice, alice,
); );
let bob_swap = swap_as_bob( let bob_swap = swap_as_bob(
bob_network, Arc::new(Mutex::new(bob_network)),
bob_sender, bob_sender,
bob_monero_wallet.clone(), bob_monero_wallet.clone(),
bob_bitcoin_wallet.clone(), bob_bitcoin_wallet.clone(),