Multiple swaps with the same peer

- Swap-id is exchanged during execution setup. CLI (Bob) sends the swap-id to be used in his first message.
- Transfer poof and encryption signature messages include the swap-id so it can be properly associated with the correct swap.
- ASB: Encryption signatures are associated with swaps by swap-id, not peer-id.
- ASB: Transfer proofs are still associated to peer-ids (because they have to be sent to the respective peer), but the ASB can buffer multiple
- CLI: Incoming transfer proofs are checked for matching swap-id. If a transfer proof with a different swap-id than the current executing swap is received it will be ignored. We can change this to saving into the database.

Includes concurrent swap tests with the same Bob.

- One test that pauses and starts an additional swap after the transfer proof was received. Results in both swaps being redeemed after resuming the first swap.
- One test that pauses and starts an additional swap before the transfer proof is sent (just after BTC locked). Results in the second swap redeeming and the first swap being refunded (because the transfer proof on Bob's side is lost). Once we store transfer proofs that we receive during executing a different swap into the database both swaps should redeem.

Note that the monero harness was adapted to allow creating wallets with multiple outputs, which is needed for Alice.
This commit is contained in:
Daniel Karzel 2021-04-08 18:56:26 +10:00
parent 46f144ac67
commit c976358c37
No known key found for this signature in database
GPG Key ID: 30C3FC2E438ADB6E
29 changed files with 430 additions and 114 deletions

View File

@ -116,7 +116,10 @@ jobs:
bob_refunds_using_cancel_and_refund_command_timelock_not_expired_force, bob_refunds_using_cancel_and_refund_command_timelock_not_expired_force,
punish, punish,
alice_punishes_after_restart_punish_timelock_expired, alice_punishes_after_restart_punish_timelock_expired,
alice_refunds_after_restart_bob_refunded alice_refunds_after_restart_bob_refunded,
ensure_same_swap_id,
concurrent_bobs_after_xmr_lock_proof_sent,
concurrent_bobs_before_xmr_lock_proof_sent
] ]
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:

View File

@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
### Changed
- Allow multiple concurrent swaps with the same peer on the ASB.
This is a breaking change because the swap ID is now agreed upon between CLI and ASB during swap setup.
Resuming swaps started prior to this change can result in unexpected behaviour.
## [0.4.0] - 2021-04-06 ## [0.4.0] - 2021-04-06
### Changed ### Changed

View File

@ -15,5 +15,8 @@ status = [
"docker_tests (bob_refunds_using_cancel_and_refund_command_timelock_not_expired)", "docker_tests (bob_refunds_using_cancel_and_refund_command_timelock_not_expired)",
"docker_tests (punish)", "docker_tests (punish)",
"docker_tests (alice_punishes_after_restart_punish_timelock_expired)", "docker_tests (alice_punishes_after_restart_punish_timelock_expired)",
"docker_tests (alice_refunds_after_restart_bob_refunded)" "docker_tests (alice_refunds_after_restart_bob_refunded)",
"docker_tests (ensure_same_swap_id)",
"docker_tests (concurrent_bobs_after_xmr_lock_proof_sent)",
"docker_tests (concurrent_bobs_before_xmr_lock_proof_sent)"
] ]

View File

@ -98,7 +98,7 @@ impl<'c> Monero {
Ok(wallet) Ok(wallet)
} }
pub async fn init(&self, wallet_amount: Vec<(&str, u64)>) -> Result<()> { pub async fn init_miner(&self) -> Result<()> {
let miner_wallet = self.wallet("miner")?; let miner_wallet = self.wallet("miner")?;
let miner_address = miner_wallet.address().await?.address; let miner_address = miner_wallet.address().await?.address;
@ -108,17 +108,34 @@ impl<'c> Monero {
tracing::info!("Generated {:?} blocks", res.blocks.len()); tracing::info!("Generated {:?} blocks", res.blocks.len());
miner_wallet.refresh().await?; miner_wallet.refresh().await?;
for (wallet, amount) in wallet_amount.iter() { Ok(())
if *amount > 0 { }
let wallet = self.wallet(wallet)?;
let address = wallet.address().await?.address; pub async fn init_wallet(&self, name: &str, amount_in_outputs: Vec<u64>) -> Result<()> {
miner_wallet.transfer(&address, *amount).await?; let miner_wallet = self.wallet("miner")?;
let miner_address = miner_wallet.address().await?.address;
let monerod = &self.monerod;
let wallet = self.wallet(name)?;
let address = wallet.address().await?.address;
for amount in amount_in_outputs {
if amount > 0 {
miner_wallet.transfer(&address, amount).await?;
tracing::info!("Funded {} wallet with {}", wallet.name, amount); tracing::info!("Funded {} wallet with {}", wallet.name, amount);
monerod.client().generate_blocks(10, &miner_address).await?; monerod.client().generate_blocks(10, &miner_address).await?;
wallet.refresh().await?; wallet.refresh().await?;
} }
} }
Ok(())
}
pub async fn start_miner(&self) -> Result<()> {
let miner_wallet = self.wallet("miner")?;
let miner_address = miner_wallet.address().await?.address;
let monerod = &self.monerod;
monerod.start_miner(&miner_address).await?; monerod.start_miner(&miner_address).await?;
tracing::info!("Waiting for miner wallet to catch up..."); tracing::info!("Waiting for miner wallet to catch up...");
@ -130,6 +147,13 @@ impl<'c> Monero {
Ok(()) Ok(())
} }
pub async fn init_and_start_miner(&self) -> Result<()> {
self.init_miner().await?;
self.start_miner().await?;
Ok(())
}
} }
fn random_prefix() -> String { fn random_prefix() -> String {

View File

@ -14,7 +14,7 @@ async fn init_miner_and_mine_to_miner_address() {
let tc = Cli::default(); let tc = Cli::default();
let (monero, _monerod_container) = Monero::new(&tc, vec![]).await.unwrap(); let (monero, _monerod_container) = Monero::new(&tc, vec![]).await.unwrap();
monero.init(vec![]).await.unwrap(); monero.init_and_start_miner().await.unwrap();
let monerod = monero.monerod(); let monerod = monero.monerod();
let miner_wallet = monero.wallet("miner").unwrap(); let miner_wallet = monero.wallet("miner").unwrap();

View File

@ -22,11 +22,10 @@ async fn fund_transfer_and_check_tx_key() {
let alice_wallet = monero.wallet("alice").unwrap(); let alice_wallet = monero.wallet("alice").unwrap();
let bob_wallet = monero.wallet("bob").unwrap(); let bob_wallet = monero.wallet("bob").unwrap();
// fund alice monero.init_miner().await.unwrap();
monero monero.init_wallet("alice", vec![fund_alice]).await.unwrap();
.init(vec![("alice", fund_alice), ("bob", fund_bob)]) monero.init_wallet("bob", vec![fund_bob]).await.unwrap();
.await monero.start_miner().await.unwrap();
.unwrap();
// check alice balance // check alice balance
let got_alice_balance = alice_wallet.balance().await.unwrap(); let got_alice_balance = alice_wallet.balance().await.unwrap();

View File

@ -107,8 +107,9 @@ async fn main() -> Result<()> {
let mut swarm = swarm::new::<Behaviour>(&seed)?; let mut swarm = swarm::new::<Behaviour>(&seed)?;
swarm.add_address(alice_peer_id, alice_multiaddr); swarm.add_address(alice_peer_id, alice_multiaddr);
let swap_id = Uuid::new_v4();
let (event_loop, mut event_loop_handle) = let (event_loop, mut event_loop_handle) =
EventLoop::new(swarm, alice_peer_id, bitcoin_wallet.clone())?; EventLoop::new(swap_id, swarm, alice_peer_id, bitcoin_wallet.clone())?;
let event_loop = tokio::spawn(event_loop.run()); let event_loop = tokio::spawn(event_loop.run());
let send_bitcoin = determine_btc_to_swap( let send_bitcoin = determine_btc_to_swap(
@ -128,7 +129,6 @@ async fn main() -> Result<()> {
) )
.await?; .await?;
let swap_id = Uuid::new_v4();
db.insert_peer_id(swap_id, alice_peer_id).await?; db.insert_peer_id(swap_id, alice_peer_id).await?;
let swap = Builder::new( let swap = Builder::new(
@ -190,7 +190,7 @@ async fn main() -> Result<()> {
swarm.add_address(alice_peer_id, alice_multiaddr); swarm.add_address(alice_peer_id, alice_multiaddr);
let (event_loop, event_loop_handle) = let (event_loop, event_loop_handle) =
EventLoop::new(swarm, alice_peer_id, bitcoin_wallet.clone())?; EventLoop::new(swap_id, swarm, alice_peer_id, bitcoin_wallet.clone())?;
let handle = tokio::spawn(event_loop.run()); let handle = tokio::spawn(event_loop.run());
let swap = Builder::new( let swap = Builder::new(

View File

@ -5,6 +5,7 @@ use libp2p::request_response::{
RequestResponseMessage, RequestResponseMessage,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use uuid::Uuid;
pub type OutEvent = RequestResponseEvent<Request, ()>; pub type OutEvent = RequestResponseEvent<Request, ()>;
@ -19,6 +20,7 @@ impl ProtocolName for EncryptedSignatureProtocol {
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Request { pub struct Request {
pub swap_id: Uuid,
pub tx_redeem_encsig: crate::bitcoin::EncryptedSignature, pub tx_redeem_encsig: crate::bitcoin::EncryptedSignature,
} }

View File

@ -6,6 +6,7 @@ use libp2p::request_response::{
RequestResponseMessage, RequestResponseMessage,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use uuid::Uuid;
pub type OutEvent = RequestResponseEvent<Request, ()>; pub type OutEvent = RequestResponseEvent<Request, ()>;
@ -20,6 +21,7 @@ impl ProtocolName for TransferProofProtocol {
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Request { pub struct Request {
pub swap_id: Uuid,
pub tx_lock_proof: monero::TransferProof, pub tx_lock_proof: monero::TransferProof,
} }

View File

@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize};
use sha2::Sha256; use sha2::Sha256;
use sigma_fun::ext::dl_secp256k1_ed25519_eq::{CrossCurveDLEQ, CrossCurveDLEQProof}; use sigma_fun::ext::dl_secp256k1_ed25519_eq::{CrossCurveDLEQ, CrossCurveDLEQProof};
use sigma_fun::HashTranscript; use sigma_fun::HashTranscript;
use uuid::Uuid;
pub mod alice; pub mod alice;
pub mod bob; pub mod bob;
@ -18,14 +19,9 @@ pub static CROSS_CURVE_PROOF_SYSTEM: Lazy<
) )
}); });
#[derive(Debug, Copy, Clone)]
pub struct StartingBalances {
pub xmr: monero::Amount,
pub btc: bitcoin::Amount,
}
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Message0 { pub struct Message0 {
swap_id: Uuid,
B: bitcoin::PublicKey, B: bitcoin::PublicKey,
S_b_monero: monero::PublicKey, S_b_monero: monero::PublicKey,
S_b_bitcoin: bitcoin::PublicKey, S_b_bitcoin: bitcoin::PublicKey,

View File

@ -6,6 +6,7 @@ use libp2p::request_response::{
RequestId, RequestResponseEvent, RequestResponseMessage, ResponseChannel, RequestId, RequestResponseEvent, RequestResponseMessage, ResponseChannel,
}; };
use libp2p::{NetworkBehaviour, PeerId}; use libp2p::{NetworkBehaviour, PeerId};
use uuid::Uuid;
#[derive(Debug)] #[derive(Debug)]
pub enum OutEvent { pub enum OutEvent {
@ -20,6 +21,7 @@ pub enum OutEvent {
}, },
ExecutionSetupDone { ExecutionSetupDone {
bob_peer_id: PeerId, bob_peer_id: PeerId,
swap_id: Uuid,
state3: Box<State3>, state3: Box<State3>,
}, },
TransferProofAcknowledged { TransferProofAcknowledged {
@ -157,9 +159,11 @@ impl From<execution_setup::OutEvent> for OutEvent {
match event { match event {
Done { Done {
bob_peer_id, bob_peer_id,
swap_id,
state3, state3,
} => OutEvent::ExecutionSetupDone { } => OutEvent::ExecutionSetupDone {
bob_peer_id, bob_peer_id,
swap_id,
state3: Box::new(state3), state3: Box::new(state3),
}, },
Failure { peer, error } => OutEvent::Failure { peer, error }, Failure { peer, error } => OutEvent::Failure { peer, error },

View File

@ -3,7 +3,7 @@ use crate::database::Database;
use crate::env::Config; use crate::env::Config;
use crate::monero::BalanceTooLow; use crate::monero::BalanceTooLow;
use crate::network::quote::BidQuote; use crate::network::quote::BidQuote;
use crate::network::{encrypted_signature, spot_price, transfer_proof}; use crate::network::{spot_price, transfer_proof};
use crate::protocol::alice::{AliceState, Behaviour, OutEvent, State0, State3, Swap}; use crate::protocol::alice::{AliceState, Behaviour, OutEvent, State0, State3, Swap};
use crate::{bitcoin, kraken, monero}; use crate::{bitcoin, kraken, monero};
use anyhow::{bail, Context, Result}; use anyhow::{bail, Context, Result};
@ -43,9 +43,8 @@ pub struct EventLoop<RS> {
swap_sender: mpsc::Sender<Swap>, swap_sender: mpsc::Sender<Swap>,
/// Stores a sender per peer for incoming [`EncryptedSignature`]s. /// Stores incoming [`EncryptedSignature`]s per swap.
recv_encrypted_signature: recv_encrypted_signature: HashMap<Uuid, bmrng::RequestSender<bitcoin::EncryptedSignature, ()>>,
HashMap<PeerId, bmrng::RequestSender<encrypted_signature::Request, ()>>,
inflight_encrypted_signatures: FuturesUnordered<BoxFuture<'static, ResponseChannel<()>>>, inflight_encrypted_signatures: FuturesUnordered<BoxFuture<'static, ResponseChannel<()>>>,
send_transfer_proof: FuturesUnordered<OutgoingTransferProof>, send_transfer_proof: FuturesUnordered<OutgoingTransferProof>,
@ -120,7 +119,7 @@ where
} }
}; };
let handle = self.new_handle(peer_id); let handle = self.new_handle(peer_id, swap_id);
let swap = Swap { let swap = Swap {
event_loop_handle: handle, event_loop_handle: handle,
@ -186,25 +185,26 @@ where
tracing::debug!(%peer, "Failed to respond with quote"); tracing::debug!(%peer, "Failed to respond with quote");
} }
} }
SwarmEvent::Behaviour(OutEvent::ExecutionSetupDone{bob_peer_id, state3}) => { SwarmEvent::Behaviour(OutEvent::ExecutionSetupDone{bob_peer_id, swap_id, state3}) => {
let _ = self.handle_execution_setup_done(bob_peer_id, *state3).await; let _ = self.handle_execution_setup_done(bob_peer_id, swap_id, *state3).await;
} }
SwarmEvent::Behaviour(OutEvent::TransferProofAcknowledged { peer, id }) => { SwarmEvent::Behaviour(OutEvent::TransferProofAcknowledged { peer, id }) => {
tracing::trace!(%peer, "Bob acknowledged transfer proof"); tracing::debug!(%peer, "Bob acknowledged transfer proof");
if let Some(responder) = self.inflight_transfer_proofs.remove(&id) { if let Some(responder) = self.inflight_transfer_proofs.remove(&id) {
let _ = responder.respond(()); let _ = responder.respond(());
} }
} }
SwarmEvent::Behaviour(OutEvent::EncryptedSignatureReceived{ msg, channel, peer }) => { SwarmEvent::Behaviour(OutEvent::EncryptedSignatureReceived{ msg, channel, peer }) => {
let sender = match self.recv_encrypted_signature.remove(&peer) { let sender = match self.recv_encrypted_signature.remove(&msg.swap_id) {
Some(sender) => sender, Some(sender) => sender,
None => { None => {
// TODO: Don't just drop encsig if we currently don't have a running swap for it, save in db
tracing::warn!(%peer, "No sender for encrypted signature, maybe already handled?"); tracing::warn!(%peer, "No sender for encrypted signature, maybe already handled?");
continue; continue;
} }
}; };
let mut responder = match sender.send(*msg).await { let mut responder = match sender.send(msg.tx_redeem_encsig).await {
Ok(responder) => responder, Ok(responder) => responder,
Err(_) => { Err(_) => {
tracing::warn!(%peer, "Failed to relay encrypted signature to swap"); tracing::warn!(%peer, "Failed to relay encrypted signature to swap");
@ -262,8 +262,8 @@ where
let id = self.swarm.transfer_proof.send_request(&peer, transfer_proof); let id = self.swarm.transfer_proof.send_request(&peer, transfer_proof);
self.inflight_transfer_proofs.insert(id, responder); self.inflight_transfer_proofs.insert(id, responder);
}, },
Some(Err(_)) => { Some(Err(e)) => {
tracing::debug!("A swap stopped without sending a transfer proof"); tracing::debug!("A swap stopped without sending a transfer proof: {:#}", e);
} }
None => { None => {
unreachable!("stream of transfer proof receivers must never terminate") unreachable!("stream of transfer proof receivers must never terminate")
@ -319,9 +319,13 @@ where
}) })
} }
async fn handle_execution_setup_done(&mut self, bob_peer_id: PeerId, state3: State3) { async fn handle_execution_setup_done(
let swap_id = Uuid::new_v4(); &mut self,
let handle = self.new_handle(bob_peer_id); bob_peer_id: PeerId,
swap_id: Uuid,
state3: State3,
) {
let handle = self.new_handle(bob_peer_id, swap_id);
let initial_state = AliceState::Started { let initial_state = AliceState::Started {
state3: Box::new(state3), state3: Box::new(state3),
@ -337,7 +341,7 @@ where
swap_id, swap_id,
}; };
// TODO: Consider adding separate components for start/rsume of swaps // TODO: Consider adding separate components for start/resume of swaps
// swaps save peer id so we can resume // swaps save peer id so we can resume
match self.db.insert_peer_id(swap_id, bob_peer_id).await { match self.db.insert_peer_id(swap_id, bob_peer_id).await {
@ -354,17 +358,24 @@ where
/// Create a new [`EventLoopHandle`] that is scoped for communication with /// Create a new [`EventLoopHandle`] that is scoped for communication with
/// the given peer. /// the given peer.
fn new_handle(&mut self, peer: PeerId) -> EventLoopHandle { fn new_handle(&mut self, peer: PeerId, swap_id: Uuid) -> EventLoopHandle {
// we deliberately don't put timeouts on these channels because the swap always // we deliberately don't put timeouts on these channels because the swap always
// races these futures against a timelock // races these futures against a timelock
let (transfer_proof_sender, mut transfer_proof_receiver) = bmrng::channel(1); let (transfer_proof_sender, mut transfer_proof_receiver) = bmrng::channel(1);
let encrypted_signature = bmrng::channel(1); let encrypted_signature = bmrng::channel(1);
self.recv_encrypted_signature self.recv_encrypted_signature
.insert(peer, encrypted_signature.0); .insert(swap_id, encrypted_signature.0);
self.send_transfer_proof.push( self.send_transfer_proof.push(
async move { async move {
let (request, responder) = transfer_proof_receiver.recv().await?; let (transfer_proof, responder) = transfer_proof_receiver.recv().await?;
let request = transfer_proof::Request {
swap_id,
tx_lock_proof: transfer_proof,
};
Ok((peer, request, responder)) Ok((peer, request, responder))
} }
@ -442,13 +453,13 @@ impl LatestRate for KrakenRate {
#[derive(Debug)] #[derive(Debug)]
pub struct EventLoopHandle { pub struct EventLoopHandle {
recv_encrypted_signature: Option<bmrng::RequestReceiver<encrypted_signature::Request, ()>>, recv_encrypted_signature: Option<bmrng::RequestReceiver<bitcoin::EncryptedSignature, ()>>,
send_transfer_proof: Option<bmrng::RequestSender<transfer_proof::Request, ()>>, send_transfer_proof: Option<bmrng::RequestSender<monero::TransferProof, ()>>,
} }
impl EventLoopHandle { impl EventLoopHandle {
pub async fn recv_encrypted_signature(&mut self) -> Result<bitcoin::EncryptedSignature> { pub async fn recv_encrypted_signature(&mut self) -> Result<bitcoin::EncryptedSignature> {
let (request, responder) = self let (tx_redeem_encsig, responder) = self
.recv_encrypted_signature .recv_encrypted_signature
.take() .take()
.context("Encrypted signature was already received")? .context("Encrypted signature was already received")?
@ -459,14 +470,14 @@ impl EventLoopHandle {
.respond(()) .respond(())
.context("Failed to acknowledge receipt of encrypted signature")?; .context("Failed to acknowledge receipt of encrypted signature")?;
Ok(request.tx_redeem_encsig) Ok(tx_redeem_encsig)
} }
pub async fn send_transfer_proof(&mut self, msg: monero::TransferProof) -> Result<()> { pub async fn send_transfer_proof(&mut self, msg: monero::TransferProof) -> Result<()> {
self.send_transfer_proof self.send_transfer_proof
.take() .take()
.context("Transfer proof was already sent")? .context("Transfer proof was already sent")?
.send_receive(transfer_proof::Request { tx_lock_proof: msg }) .send_receive(msg)
.await .await
.context("Failed to send transfer proof")?; .context("Failed to send transfer proof")?;

View File

@ -4,18 +4,27 @@ use crate::protocol::{Message0, Message2, Message4};
use anyhow::{Context, Error}; use anyhow::{Context, Error};
use libp2p::PeerId; use libp2p::PeerId;
use libp2p_async_await::BehaviourOutEvent; use libp2p_async_await::BehaviourOutEvent;
use uuid::Uuid;
#[derive(Debug)] #[derive(Debug)]
pub enum OutEvent { pub enum OutEvent {
Done { bob_peer_id: PeerId, state3: State3 }, Done {
Failure { peer: PeerId, error: Error }, bob_peer_id: PeerId,
swap_id: Uuid,
state3: State3,
},
Failure {
peer: PeerId,
error: Error,
},
} }
impl From<BehaviourOutEvent<(PeerId, State3), (), Error>> for OutEvent { impl From<BehaviourOutEvent<(PeerId, (Uuid, State3)), (), Error>> for OutEvent {
fn from(event: BehaviourOutEvent<(PeerId, State3), (), Error>) -> Self { fn from(event: BehaviourOutEvent<(PeerId, (Uuid, State3)), (), Error>) -> Self {
match event { match event {
BehaviourOutEvent::Inbound(_, Ok((bob_peer_id, state3))) => OutEvent::Done { BehaviourOutEvent::Inbound(_, Ok((bob_peer_id, (swap_id, state3)))) => OutEvent::Done {
bob_peer_id, bob_peer_id,
swap_id,
state3, state3,
}, },
BehaviourOutEvent::Inbound(peer, Err(e)) => OutEvent::Failure { peer, error: e }, BehaviourOutEvent::Inbound(peer, Err(e)) => OutEvent::Failure { peer, error: e },
@ -27,7 +36,7 @@ impl From<BehaviourOutEvent<(PeerId, State3), (), Error>> for OutEvent {
#[derive(libp2p::NetworkBehaviour)] #[derive(libp2p::NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", event_process = false)] #[behaviour(out_event = "OutEvent", event_process = false)]
pub struct Behaviour { pub struct Behaviour {
inner: libp2p_async_await::Behaviour<(PeerId, State3), (), anyhow::Error>, inner: libp2p_async_await::Behaviour<(PeerId, (Uuid, State3)), (), anyhow::Error>,
} }
impl Default for Behaviour { impl Default for Behaviour {
@ -45,7 +54,7 @@ impl Behaviour {
let message0 = let message0 =
serde_cbor::from_slice::<Message0>(&substream.read_message(BUF_SIZE).await?) serde_cbor::from_slice::<Message0>(&substream.read_message(BUF_SIZE).await?)
.context("Failed to deserialize message0")?; .context("Failed to deserialize message0")?;
let state1 = state0.receive(message0)?; let (swap_id, state1) = state0.receive(message0)?;
substream substream
.write_message( .write_message(
@ -73,7 +82,7 @@ impl Behaviour {
.context("Failed to deserialize message4")?; .context("Failed to deserialize message4")?;
let state3 = state2.receive(message4)?; let state3 = state2.receive(message4)?;
Ok((bob, state3)) Ok((bob, (swap_id, state3)))
}) })
} }
} }

View File

@ -13,6 +13,7 @@ use rand::{CryptoRng, RngCore};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sigma_fun::ext::dl_secp256k1_ed25519_eq::CrossCurveDLEQProof; use sigma_fun::ext::dl_secp256k1_ed25519_eq::CrossCurveDLEQProof;
use std::fmt; use std::fmt;
use uuid::Uuid;
#[derive(Debug)] #[derive(Debug)]
pub enum AliceState { pub enum AliceState {
@ -146,7 +147,7 @@ impl State0 {
}) })
} }
pub fn receive(self, msg: Message0) -> Result<State1> { pub fn receive(self, msg: Message0) -> Result<(Uuid, State1)> {
let valid = CROSS_CURVE_PROOF_SYSTEM.verify( let valid = CROSS_CURVE_PROOF_SYSTEM.verify(
&msg.dleq_proof_s_b, &msg.dleq_proof_s_b,
( (
@ -164,7 +165,7 @@ impl State0 {
let v = self.v_a + msg.v_b; let v = self.v_a + msg.v_b;
Ok(State1 { Ok((msg.swap_id, State1 {
a: self.a, a: self.a,
B: msg.B, B: msg.B,
s_a: self.s_a, s_a: self.s_a,
@ -182,7 +183,7 @@ impl State0 {
refund_address: msg.refund_address, refund_address: msg.refund_address,
redeem_address: self.redeem_address, redeem_address: self.redeem_address,
punish_address: self.punish_address, punish_address: self.punish_address,
}) }))
} }
} }

View File

@ -5,7 +5,6 @@ use crate::env::Config;
use crate::protocol::alice; use crate::protocol::alice;
use crate::protocol::alice::event_loop::EventLoopHandle; use crate::protocol::alice::event_loop::EventLoopHandle;
use crate::protocol::alice::AliceState; use crate::protocol::alice::AliceState;
use crate::protocol::alice::AliceState::XmrLockTransferProofSent;
use crate::{bitcoin, database, monero}; use crate::{bitcoin, database, monero};
use anyhow::{bail, Context, Result}; use anyhow::{bail, Context, Result};
use tokio::select; use tokio::select;
@ -16,7 +15,7 @@ pub async fn run(swap: alice::Swap) -> Result<AliceState> {
run_until(swap, |_| false).await run_until(swap, |_| false).await
} }
#[tracing::instrument(name = "swap", skip(swap,exit_early), fields(id = %swap.swap_id))] #[tracing::instrument(name = "swap", skip(swap,exit_early), fields(id = %swap.swap_id), err)]
pub async fn run_until( pub async fn run_until(
mut swap: alice::Swap, mut swap: alice::Swap,
exit_early: fn(&AliceState) -> bool, exit_early: fn(&AliceState) -> bool,
@ -127,7 +126,7 @@ async fn next_state(
result = event_loop_handle.send_transfer_proof(transfer_proof.clone()) => { result = event_loop_handle.send_transfer_proof(transfer_proof.clone()) => {
result?; result?;
XmrLockTransferProofSent { AliceState::XmrLockTransferProofSent {
monero_wallet_restore_blockheight, monero_wallet_restore_blockheight,
transfer_proof, transfer_proof,
state3, state3,

View File

@ -1,6 +1,6 @@
use crate::bitcoin::EncryptedSignature; use crate::bitcoin::EncryptedSignature;
use crate::network::quote::BidQuote; use crate::network::quote::BidQuote;
use crate::network::{encrypted_signature, spot_price, transfer_proof}; use crate::network::{encrypted_signature, spot_price};
use crate::protocol::bob::{Behaviour, OutEvent, State0, State2}; use crate::protocol::bob::{Behaviour, OutEvent, State0, State2};
use crate::{bitcoin, monero}; use crate::{bitcoin, monero};
use anyhow::{Context, Result}; use anyhow::{Context, Result};
@ -12,9 +12,11 @@ use libp2p::{PeerId, Swarm};
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use uuid::Uuid;
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct EventLoop { pub struct EventLoop {
swap_id: Uuid,
swarm: libp2p::Swarm<Behaviour>, swarm: libp2p::Swarm<Behaviour>,
bitcoin_wallet: Arc<bitcoin::Wallet>, bitcoin_wallet: Arc<bitcoin::Wallet>,
alice_peer_id: PeerId, alice_peer_id: PeerId,
@ -22,7 +24,7 @@ pub struct EventLoop {
// these streams represents outgoing requests that we have to make // these streams represents outgoing requests that we have to make
quote_requests: bmrng::RequestReceiverStream<(), BidQuote>, quote_requests: bmrng::RequestReceiverStream<(), BidQuote>,
spot_price_requests: bmrng::RequestReceiverStream<spot_price::Request, spot_price::Response>, spot_price_requests: bmrng::RequestReceiverStream<spot_price::Request, spot_price::Response>,
encrypted_signature_requests: bmrng::RequestReceiverStream<encrypted_signature::Request, ()>, encrypted_signatures: bmrng::RequestReceiverStream<EncryptedSignature, ()>,
execution_setup_requests: bmrng::RequestReceiverStream<State0, Result<State2>>, execution_setup_requests: bmrng::RequestReceiverStream<State0, Result<State2>>,
// these represents requests that are currently in-flight. // these represents requests that are currently in-flight.
@ -34,7 +36,7 @@ pub struct EventLoop {
inflight_execution_setup: Option<bmrng::Responder<Result<State2>>>, inflight_execution_setup: Option<bmrng::Responder<Result<State2>>>,
/// The sender we will use to relay incoming transfer proofs. /// The sender we will use to relay incoming transfer proofs.
transfer_proof: bmrng::RequestSender<transfer_proof::Request, ()>, transfer_proof: bmrng::RequestSender<monero::TransferProof, ()>,
/// The future representing the successful handling of an incoming transfer /// The future representing the successful handling of an incoming transfer
/// proof. /// proof.
/// ///
@ -47,6 +49,7 @@ pub struct EventLoop {
impl EventLoop { impl EventLoop {
pub fn new( pub fn new(
swap_id: Uuid,
swarm: Swarm<Behaviour>, swarm: Swarm<Behaviour>,
alice_peer_id: PeerId, alice_peer_id: PeerId,
bitcoin_wallet: Arc<bitcoin::Wallet>, bitcoin_wallet: Arc<bitcoin::Wallet>,
@ -58,12 +61,13 @@ impl EventLoop {
let quote = bmrng::channel_with_timeout(1, Duration::from_secs(30)); let quote = bmrng::channel_with_timeout(1, Duration::from_secs(30));
let event_loop = EventLoop { let event_loop = EventLoop {
swap_id,
swarm, swarm,
alice_peer_id, alice_peer_id,
bitcoin_wallet, bitcoin_wallet,
execution_setup_requests: execution_setup.1.into(), execution_setup_requests: execution_setup.1.into(),
transfer_proof: transfer_proof.0, transfer_proof: transfer_proof.0,
encrypted_signature_requests: encrypted_signature.1.into(), encrypted_signatures: encrypted_signature.1.into(),
spot_price_requests: spot_price.1.into(), spot_price_requests: spot_price.1.into(),
quote_requests: quote.1.into(), quote_requests: quote.1.into(),
inflight_spot_price_requests: HashMap::default(), inflight_spot_price_requests: HashMap::default(),
@ -108,10 +112,20 @@ impl EventLoop {
} }
} }
SwarmEvent::Behaviour(OutEvent::TransferProofReceived { msg, channel }) => { SwarmEvent::Behaviour(OutEvent::TransferProofReceived { msg, channel }) => {
let mut responder = match self.transfer_proof.send(*msg).await { if msg.swap_id != self.swap_id {
// TODO: Save unexpected transfer proofs in the database and check for messages in the database when handling swaps
tracing::warn!("Received unexpected transfer proof for swap {} while running swap {}. This transfer proof will be ignored.", msg.swap_id, self.swap_id);
// When receiving a transfer proof that is unexpected we still have to acknowledge that it was received
let _ = self.swarm.transfer_proof.send_response(channel, ());
continue;
}
let mut responder = match self.transfer_proof.send(msg.tx_lock_proof).await {
Ok(responder) => responder, Ok(responder) => responder,
Err(_) => { Err(e) => {
tracing::warn!("Failed to pass on transfer proof"); tracing::warn!("Failed to pass on transfer proof: {:#}", e);
continue; continue;
} }
}; };
@ -180,7 +194,12 @@ impl EventLoop {
self.swarm.execution_setup.run(self.alice_peer_id, request, self.bitcoin_wallet.clone()); self.swarm.execution_setup.run(self.alice_peer_id, request, self.bitcoin_wallet.clone());
self.inflight_execution_setup = Some(responder); self.inflight_execution_setup = Some(responder);
}, },
Some((request, responder)) = self.encrypted_signature_requests.next().fuse(), if self.is_connected_to_alice() => { Some((tx_redeem_encsig, responder)) = self.encrypted_signatures.next().fuse(), if self.is_connected_to_alice() => {
let request = encrypted_signature::Request {
swap_id: self.swap_id,
tx_redeem_encsig
};
let id = self.swarm.encrypted_signature.send_request(&self.alice_peer_id, request); let id = self.swarm.encrypted_signature.send_request(&self.alice_peer_id, request);
self.inflight_encrypted_signature_requests.insert(id, responder); self.inflight_encrypted_signature_requests.insert(id, responder);
}, },
@ -202,8 +221,8 @@ impl EventLoop {
#[derive(Debug)] #[derive(Debug)]
pub struct EventLoopHandle { pub struct EventLoopHandle {
execution_setup: bmrng::RequestSender<State0, Result<State2>>, execution_setup: bmrng::RequestSender<State0, Result<State2>>,
transfer_proof: bmrng::RequestReceiver<transfer_proof::Request, ()>, transfer_proof: bmrng::RequestReceiver<monero::TransferProof, ()>,
encrypted_signature: bmrng::RequestSender<encrypted_signature::Request, ()>, encrypted_signature: bmrng::RequestSender<EncryptedSignature, ()>,
spot_price: bmrng::RequestSender<spot_price::Request, spot_price::Response>, spot_price: bmrng::RequestSender<spot_price::Request, spot_price::Response>,
quote: bmrng::RequestSender<(), BidQuote>, quote: bmrng::RequestSender<(), BidQuote>,
} }
@ -213,8 +232,8 @@ impl EventLoopHandle {
self.execution_setup.send_receive(state0).await? self.execution_setup.send_receive(state0).await?
} }
pub async fn recv_transfer_proof(&mut self) -> Result<transfer_proof::Request> { pub async fn recv_transfer_proof(&mut self) -> Result<monero::TransferProof> {
let (request, responder) = self let (transfer_proof, responder) = self
.transfer_proof .transfer_proof
.recv() .recv()
.await .await
@ -223,7 +242,7 @@ impl EventLoopHandle {
.respond(()) .respond(())
.context("Failed to acknowledge receipt of transfer proof")?; .context("Failed to acknowledge receipt of transfer proof")?;
Ok(request) Ok(transfer_proof)
} }
pub async fn request_spot_price(&mut self, btc: bitcoin::Amount) -> Result<monero::Amount> { pub async fn request_spot_price(&mut self, btc: bitcoin::Amount) -> Result<monero::Amount> {
@ -244,7 +263,7 @@ impl EventLoopHandle {
) -> Result<()> { ) -> Result<()> {
Ok(self Ok(self
.encrypted_signature .encrypted_signature
.send_receive(encrypted_signature::Request { tx_redeem_encsig }) .send_receive(tx_redeem_encsig)
.await?) .await?)
} }
} }

View File

@ -17,6 +17,7 @@ use serde::{Deserialize, Serialize};
use sha2::Sha256; use sha2::Sha256;
use sigma_fun::ext::dl_secp256k1_ed25519_eq::CrossCurveDLEQProof; use sigma_fun::ext::dl_secp256k1_ed25519_eq::CrossCurveDLEQProof;
use std::fmt; use std::fmt;
use uuid::Uuid;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum BobState { pub enum BobState {
@ -69,6 +70,7 @@ impl fmt::Display for BobState {
#[derive(Clone, Debug, PartialEq)] #[derive(Clone, Debug, PartialEq)]
pub struct State0 { pub struct State0 {
swap_id: Uuid,
b: bitcoin::SecretKey, b: bitcoin::SecretKey,
s_b: monero::Scalar, s_b: monero::Scalar,
S_b_monero: monero::PublicKey, S_b_monero: monero::PublicKey,
@ -84,7 +86,9 @@ pub struct State0 {
} }
impl State0 { impl State0 {
#[allow(clippy::too_many_arguments)]
pub fn new<R: RngCore + CryptoRng>( pub fn new<R: RngCore + CryptoRng>(
swap_id: Uuid,
rng: &mut R, rng: &mut R,
btc: bitcoin::Amount, btc: bitcoin::Amount,
xmr: monero::Amount, xmr: monero::Amount,
@ -101,6 +105,7 @@ impl State0 {
let (dleq_proof_s_b, (S_b_bitcoin, S_b_monero)) = CROSS_CURVE_PROOF_SYSTEM.prove(&s_b, rng); let (dleq_proof_s_b, (S_b_bitcoin, S_b_monero)) = CROSS_CURVE_PROOF_SYSTEM.prove(&s_b, rng);
Self { Self {
swap_id,
b, b,
s_b, s_b,
v_b, v_b,
@ -120,6 +125,7 @@ impl State0 {
pub fn next_message(&self) -> Message0 { pub fn next_message(&self) -> Message0 {
Message0 { Message0 {
swap_id: self.swap_id,
B: self.b.public(), B: self.b.public(),
S_b_monero: self.S_b_monero, S_b_monero: self.S_b_monero,
S_b_bitcoin: self.S_b_bitcoin, S_b_bitcoin: self.S_b_bitcoin,

View File

@ -8,6 +8,7 @@ use crate::{bitcoin, monero};
use anyhow::{bail, Context, Result}; use anyhow::{bail, Context, Result};
use rand::rngs::OsRng; use rand::rngs::OsRng;
use tokio::select; use tokio::select;
use uuid::Uuid;
pub fn is_complete(state: &BobState) -> bool { pub fn is_complete(state: &BobState) -> bool {
matches!( matches!(
@ -32,6 +33,7 @@ pub async fn run_until(
while !is_target_state(&current_state) { while !is_target_state(&current_state) {
current_state = next_state( current_state = next_state(
swap.swap_id,
current_state, current_state,
&mut swap.event_loop_handle, &mut swap.event_loop_handle,
swap.bitcoin_wallet.as_ref(), swap.bitcoin_wallet.as_ref(),
@ -51,6 +53,7 @@ pub async fn run_until(
} }
async fn next_state( async fn next_state(
swap_id: Uuid,
state: BobState, state: BobState,
event_loop_handle: &mut EventLoopHandle, event_loop_handle: &mut EventLoopHandle,
bitcoin_wallet: &bitcoin::Wallet, bitcoin_wallet: &bitcoin::Wallet,
@ -65,6 +68,7 @@ async fn next_state(
let bitcoin_refund_address = bitcoin_wallet.new_address().await?; let bitcoin_refund_address = bitcoin_wallet.new_address().await?;
let state2 = request_price_and_setup( let state2 = request_price_and_setup(
swap_id,
btc_amount, btc_amount,
event_loop_handle, event_loop_handle,
env_config, env_config,
@ -103,7 +107,7 @@ async fn next_state(
select! { select! {
transfer_proof = transfer_proof_watcher => { transfer_proof = transfer_proof_watcher => {
let transfer_proof = transfer_proof?.tx_lock_proof; let transfer_proof = transfer_proof?;
tracing::info!(txid = %transfer_proof.tx_hash(), "Alice locked Monero"); tracing::info!(txid = %transfer_proof.tx_hash(), "Alice locked Monero");
@ -244,6 +248,7 @@ async fn next_state(
} }
pub async fn request_price_and_setup( pub async fn request_price_and_setup(
swap_id: Uuid,
btc: bitcoin::Amount, btc: bitcoin::Amount,
event_loop_handle: &mut EventLoopHandle, event_loop_handle: &mut EventLoopHandle,
env_config: &Config, env_config: &Config,
@ -254,6 +259,7 @@ pub async fn request_price_and_setup(
tracing::info!("Spot price for {} is {}", btc, xmr); tracing::info!("Spot price for {} is {}", btc, xmr);
let state0 = State0::new( let state0 = State0::new(
swap_id,
&mut OsRng, &mut OsRng,
btc, btc,
xmr, xmr,

View File

@ -13,6 +13,7 @@ use swap::protocol::{alice, bob};
async fn alice_punishes_after_restart_if_punish_timelock_expired() { async fn alice_punishes_after_restart_if_punish_timelock_expired() {
harness::setup_test(FastPunishConfig, |mut ctx| async move { harness::setup_test(FastPunishConfig, |mut ctx| async move {
let (bob_swap, bob_join_handle) = ctx.bob_swap().await; let (bob_swap, bob_join_handle) = ctx.bob_swap().await;
let bob_swap_id = bob_swap.swap_id;
let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked)); let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked));
let alice_swap = ctx.alice_next_swap().await; let alice_swap = ctx.alice_next_swap().await;
@ -33,11 +34,7 @@ async fn alice_punishes_after_restart_if_punish_timelock_expired() {
.wait_until_confirmed_with(state3.punish_timelock) .wait_until_confirmed_with(state3.punish_timelock)
.await?; .await?;
} else { } else {
panic!( panic!("Alice in unexpected state {}", alice_state);
"\
Alice in unexpected state {}",
alice_state
);
} }
ctx.restart_alice().await; ctx.restart_alice().await;
@ -49,7 +46,9 @@ async fn alice_punishes_after_restart_if_punish_timelock_expired() {
// Restart Bob after Alice punished to ensure Bob transitions to // Restart Bob after Alice punished to ensure Bob transitions to
// punished and does not run indefinitely // punished and does not run indefinitely
let (bob_swap, _) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; let (bob_swap, _) = ctx
.stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id)
.await;
assert!(matches!(bob_swap.state, BobState::BtcLocked { .. })); assert!(matches!(bob_swap.state, BobState::BtcLocked { .. }));
let bob_state = bob::run(bob_swap).await?; let bob_state = bob::run(bob_swap).await?;

View File

@ -9,6 +9,7 @@ use swap::protocol::{alice, bob};
async fn given_bob_manually_refunds_after_btc_locked_bob_refunds() { async fn given_bob_manually_refunds_after_btc_locked_bob_refunds() {
harness::setup_test(FastCancelConfig, |mut ctx| async move { harness::setup_test(FastCancelConfig, |mut ctx| async move {
let (bob_swap, bob_join_handle) = ctx.bob_swap().await; let (bob_swap, bob_join_handle) = ctx.bob_swap().await;
let bob_swap_id = bob_swap.swap_id;
let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked)); let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked));
let alice_swap = ctx.alice_next_swap().await; let alice_swap = ctx.alice_next_swap().await;
@ -17,7 +18,9 @@ async fn given_bob_manually_refunds_after_btc_locked_bob_refunds() {
let bob_state = bob_swap.await??; let bob_state = bob_swap.await??;
assert!(matches!(bob_state, BobState::BtcLocked { .. })); assert!(matches!(bob_state, BobState::BtcLocked { .. }));
let (bob_swap, bob_join_handle) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; let (bob_swap, bob_join_handle) = ctx
.stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id)
.await;
// Ensure cancel timelock is expired // Ensure cancel timelock is expired
if let BobState::BtcLocked(state3) = bob_swap.state.clone() { if let BobState::BtcLocked(state3) = bob_swap.state.clone() {
@ -43,7 +46,9 @@ async fn given_bob_manually_refunds_after_btc_locked_bob_refunds() {
.await??; .await??;
assert!(matches!(state, BobState::BtcCancelled { .. })); assert!(matches!(state, BobState::BtcCancelled { .. }));
let (bob_swap, bob_join_handle) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; let (bob_swap, bob_join_handle) = ctx
.stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id)
.await;
assert!(matches!(bob_swap.state, BobState::BtcCancelled { .. })); assert!(matches!(bob_swap.state, BobState::BtcCancelled { .. }));
// Bob manually refunds // Bob manually refunds

View File

@ -10,6 +10,7 @@ use swap::protocol::{alice, bob};
async fn given_bob_manually_cancels_when_timelock_not_expired_errors() { async fn given_bob_manually_cancels_when_timelock_not_expired_errors() {
harness::setup_test(SlowCancelConfig, |mut ctx| async move { harness::setup_test(SlowCancelConfig, |mut ctx| async move {
let (bob_swap, bob_join_handle) = ctx.bob_swap().await; let (bob_swap, bob_join_handle) = ctx.bob_swap().await;
let bob_swap_id = bob_swap.swap_id;
let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked)); let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked));
let alice_swap = ctx.alice_next_swap().await; let alice_swap = ctx.alice_next_swap().await;
@ -18,7 +19,9 @@ async fn given_bob_manually_cancels_when_timelock_not_expired_errors() {
let bob_state = bob_swap.await??; let bob_state = bob_swap.await??;
assert!(matches!(bob_state, BobState::BtcLocked { .. })); assert!(matches!(bob_state, BobState::BtcLocked { .. }));
let (bob_swap, bob_join_handle) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; let (bob_swap, bob_join_handle) = ctx
.stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id)
.await;
assert!(matches!(bob_swap.state, BobState::BtcLocked { .. })); assert!(matches!(bob_swap.state, BobState::BtcLocked { .. }));
// Bob tries but fails to manually cancel // Bob tries but fails to manually cancel
@ -35,7 +38,9 @@ async fn given_bob_manually_cancels_when_timelock_not_expired_errors() {
assert!(matches!(result, Error::CancelTimelockNotExpiredYet)); assert!(matches!(result, Error::CancelTimelockNotExpiredYet));
let (bob_swap, bob_join_handle) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; let (bob_swap, bob_join_handle) = ctx
.stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id)
.await;
assert!(matches!(bob_swap.state, BobState::BtcLocked { .. })); assert!(matches!(bob_swap.state, BobState::BtcLocked { .. }));
// Bob tries but fails to manually refund // Bob tries but fails to manually refund
@ -50,7 +55,9 @@ async fn given_bob_manually_cancels_when_timelock_not_expired_errors() {
.err() .err()
.unwrap(); .unwrap();
let (bob_swap, _) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; let (bob_swap, _) = ctx
.stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id)
.await;
assert!(matches!(bob_swap.state, BobState::BtcLocked { .. })); assert!(matches!(bob_swap.state, BobState::BtcLocked { .. }));
Ok(()) Ok(())

View File

@ -9,6 +9,7 @@ use swap::protocol::{alice, bob};
async fn given_bob_manually_forces_cancel_when_timelock_not_expired_errors() { async fn given_bob_manually_forces_cancel_when_timelock_not_expired_errors() {
harness::setup_test(SlowCancelConfig, |mut ctx| async move { harness::setup_test(SlowCancelConfig, |mut ctx| async move {
let (bob_swap, bob_join_handle) = ctx.bob_swap().await; let (bob_swap, bob_join_handle) = ctx.bob_swap().await;
let bob_swap_id = bob_swap.swap_id;
let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked)); let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked));
let alice_swap = ctx.alice_next_swap().await; let alice_swap = ctx.alice_next_swap().await;
@ -17,7 +18,9 @@ async fn given_bob_manually_forces_cancel_when_timelock_not_expired_errors() {
let bob_state = bob_swap.await??; let bob_state = bob_swap.await??;
assert!(matches!(bob_state, BobState::BtcLocked { .. })); assert!(matches!(bob_state, BobState::BtcLocked { .. }));
let (bob_swap, bob_join_handle) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; let (bob_swap, bob_join_handle) = ctx
.stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id)
.await;
assert!(matches!(bob_swap.state, BobState::BtcLocked { .. })); assert!(matches!(bob_swap.state, BobState::BtcLocked { .. }));
// Bob forces a cancel that will fail // Bob forces a cancel that will fail
@ -33,7 +36,9 @@ async fn given_bob_manually_forces_cancel_when_timelock_not_expired_errors() {
assert!(is_error); assert!(is_error);
let (bob_swap, bob_join_handle) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; let (bob_swap, bob_join_handle) = ctx
.stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id)
.await;
assert!(matches!(bob_swap.state, BobState::BtcLocked { .. })); assert!(matches!(bob_swap.state, BobState::BtcLocked { .. }));
// Bob forces a refund that will fail // Bob forces a refund that will fail
@ -48,7 +53,9 @@ async fn given_bob_manually_forces_cancel_when_timelock_not_expired_errors() {
.is_err(); .is_err();
assert!(is_error); assert!(is_error);
let (bob_swap, _) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; let (bob_swap, _) = ctx
.stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id)
.await;
assert!(matches!(bob_swap.state, BobState::BtcLocked { .. })); assert!(matches!(bob_swap.state, BobState::BtcLocked { .. }));
Ok(()) Ok(())

View File

@ -0,0 +1,60 @@
pub mod harness;
use harness::bob_run_until::is_xmr_locked;
use harness::SlowCancelConfig;
use swap::protocol::alice::AliceState;
use swap::protocol::bob::BobState;
use swap::protocol::{alice, bob};
#[tokio::test]
async fn concurrent_bobs_after_xmr_lock_proof_sent() {
harness::setup_test(SlowCancelConfig, |mut ctx| async move {
let (bob_swap_1, bob_join_handle_1) = ctx.bob_swap().await;
let swap_id = bob_swap_1.swap_id;
let bob_swap_1 = tokio::spawn(bob::run_until(bob_swap_1, is_xmr_locked));
let alice_swap_1 = ctx.alice_next_swap().await;
let alice_swap_1 = tokio::spawn(alice::run(alice_swap_1));
let bob_state_1 = bob_swap_1.await??;
assert!(matches!(bob_state_1, BobState::XmrLocked { .. }));
// make sure bob_swap_1's event loop is gone
bob_join_handle_1.abort();
let (bob_swap_2, bob_join_handle_2) = ctx.bob_swap().await;
let bob_swap_2 = tokio::spawn(bob::run(bob_swap_2));
let alice_swap_2 = ctx.alice_next_swap().await;
let alice_swap_2 = tokio::spawn(alice::run(alice_swap_2));
// The 2nd swap should ALWAYS finish successfully in this
// scenario
let bob_state_2 = bob_swap_2.await??;
assert!(matches!(bob_state_2, BobState::XmrRedeemed { .. }));
let alice_state_2 = alice_swap_2.await??;
assert!(matches!(alice_state_2, AliceState::BtcRedeemed { .. }));
let (bob_swap_1, _) = ctx
.stop_and_resume_bob_from_db(bob_join_handle_2, swap_id)
.await;
assert!(matches!(bob_swap_1.state, BobState::XmrLocked { .. }));
// The 1st (paused) swap ALWAYS finishes successfully in this
// scenario, because it is ensured that Bob already received the
// transfer proof.
let bob_state_1 = bob::run(bob_swap_1).await?;
assert!(matches!(bob_state_1, BobState::XmrRedeemed { .. }));
let alice_state_1 = alice_swap_1.await??;
assert!(matches!(alice_state_1, AliceState::BtcRedeemed { .. }));
Ok(())
})
.await;
}

View File

@ -0,0 +1,61 @@
pub mod harness;
use harness::bob_run_until::is_btc_locked;
use harness::SlowCancelConfig;
use swap::protocol::alice::AliceState;
use swap::protocol::bob::BobState;
use swap::protocol::{alice, bob};
#[tokio::test]
async fn concurrent_bobs_before_xmr_lock_proof_sent() {
harness::setup_test(SlowCancelConfig, |mut ctx| async move {
let (bob_swap_1, bob_join_handle_1) = ctx.bob_swap().await;
let swap_id = bob_swap_1.swap_id;
let bob_swap_1 = tokio::spawn(bob::run_until(bob_swap_1, is_btc_locked));
let alice_swap_1 = ctx.alice_next_swap().await;
let alice_swap_1 = tokio::spawn(alice::run(alice_swap_1));
let bob_state_1 = bob_swap_1.await??;
assert!(matches!(bob_state_1, BobState::BtcLocked(_)));
// make sure bob_swap_1's event loop is gone
bob_join_handle_1.abort();
let (bob_swap_2, bob_join_handle_2) = ctx.bob_swap().await;
let bob_swap_2 = tokio::spawn(bob::run(bob_swap_2));
let alice_swap_2 = ctx.alice_next_swap().await;
let alice_swap_2 = tokio::spawn(alice::run(alice_swap_2));
// The 2nd swap ALWAYS finish successfully in this
// scenario, but will receive an "unwanted" transfer proof that is ignored in
// the event loop.
let bob_state_2 = bob_swap_2.await??;
assert!(matches!(bob_state_2, BobState::XmrRedeemed { .. }));
let alice_state_2 = alice_swap_2.await??;
assert!(matches!(alice_state_2, AliceState::BtcRedeemed { .. }));
let (bob_swap_1, _) = ctx
.stop_and_resume_bob_from_db(bob_join_handle_2, swap_id)
.await;
assert!(matches!(bob_state_1, BobState::BtcLocked(_)));
// The 1st (paused) swap is expected to refund, because the transfer
// proof is delivered to the wrong swap, and we currently don't store it in the
// database for the other swap.
let bob_state_1 = bob::run(bob_swap_1).await?;
assert!(matches!(bob_state_1, BobState::BtcRefunded { .. }));
let alice_state_1 = alice_swap_1.await??;
assert!(matches!(alice_state_1, AliceState::XmrRefunded { .. }));
Ok(())
})
.await;
}

View File

@ -0,0 +1,21 @@
pub mod harness;
use harness::SlowCancelConfig;
use swap::protocol::bob;
#[tokio::test]
async fn ensure_same_swap_id_for_alice_and_bob() {
harness::setup_test(SlowCancelConfig, |mut ctx| async move {
let (bob_swap, _) = ctx.bob_swap().await;
let bob_swap_id = bob_swap.swap_id;
let _ = tokio::spawn(bob::run(bob_swap));
// once Bob's swap is spawned we can retrieve Alice's swap and assert on the
// swap ID
let alice_swap = ctx.alice_next_swap().await;
assert_eq!(alice_swap.swap_id, bob_swap_id);
Ok(())
})
.await;
}

View File

@ -9,6 +9,7 @@ use swap::protocol::{alice, bob};
async fn given_bob_restarts_after_xmr_is_locked_resume_swap() { async fn given_bob_restarts_after_xmr_is_locked_resume_swap() {
harness::setup_test(SlowCancelConfig, |mut ctx| async move { harness::setup_test(SlowCancelConfig, |mut ctx| async move {
let (bob_swap, bob_join_handle) = ctx.bob_swap().await; let (bob_swap, bob_join_handle) = ctx.bob_swap().await;
let bob_swap_id = bob_swap.swap_id;
let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_xmr_locked)); let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_xmr_locked));
let alice_swap = ctx.alice_next_swap().await; let alice_swap = ctx.alice_next_swap().await;
@ -18,7 +19,9 @@ async fn given_bob_restarts_after_xmr_is_locked_resume_swap() {
assert!(matches!(bob_state, BobState::XmrLocked { .. })); assert!(matches!(bob_state, BobState::XmrLocked { .. }));
let (bob_swap, _) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; let (bob_swap, _) = ctx
.stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id)
.await;
assert!(matches!(bob_swap.state, BobState::XmrLocked { .. })); assert!(matches!(bob_swap.state, BobState::XmrLocked { .. }));
let bob_state = bob::run(bob_swap).await?; let bob_state = bob::run(bob_swap).await?;

View File

@ -9,6 +9,7 @@ use swap::protocol::{alice, bob};
async fn given_bob_restarts_after_xmr_is_locked_resume_swap() { async fn given_bob_restarts_after_xmr_is_locked_resume_swap() {
harness::setup_test(SlowCancelConfig, |mut ctx| async move { harness::setup_test(SlowCancelConfig, |mut ctx| async move {
let (bob_swap, bob_join_handle) = ctx.bob_swap().await; let (bob_swap, bob_join_handle) = ctx.bob_swap().await;
let bob_swap_id = bob_swap.swap_id;
let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_xmr_locked)); let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_xmr_locked));
let alice_swap = ctx.alice_next_swap().await; let alice_swap = ctx.alice_next_swap().await;
@ -18,7 +19,9 @@ async fn given_bob_restarts_after_xmr_is_locked_resume_swap() {
assert!(matches!(bob_state, BobState::XmrLocked { .. })); assert!(matches!(bob_state, BobState::XmrLocked { .. }));
let (bob_swap, _) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; let (bob_swap, _) = ctx
.stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id)
.await;
assert!(matches!(bob_swap.state, BobState::XmrLocked { .. })); assert!(matches!(bob_swap.state, BobState::XmrLocked { .. }));
let bob_state = bob::run(bob_swap).await?; let bob_state = bob::run(bob_swap).await?;

View File

@ -43,13 +43,52 @@ const BITCOIN_TEST_WALLET_NAME: &str = "testwallet";
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct StartingBalances { pub struct StartingBalances {
pub xmr: monero::Amount, pub xmr: monero::Amount,
pub xmr_outputs: Vec<monero::Amount>,
pub btc: bitcoin::Amount, pub btc: bitcoin::Amount,
} }
impl StartingBalances {
/// If monero_outputs is specified the monero balance will be:
/// monero_outputs * new_xmr = self_xmr
pub fn new(btc: bitcoin::Amount, xmr: monero::Amount, monero_outputs: Option<u64>) -> Self {
match monero_outputs {
None => {
if xmr == monero::Amount::ZERO {
return Self {
xmr,
xmr_outputs: vec![],
btc,
};
}
Self {
xmr,
xmr_outputs: vec![xmr],
btc,
}
}
Some(outputs) => {
let mut xmr_outputs = Vec::new();
let mut sum_xmr = monero::Amount::ZERO;
for _ in 0..outputs {
xmr_outputs.push(xmr);
sum_xmr = sum_xmr + xmr;
}
Self {
xmr: sum_xmr,
xmr_outputs,
btc,
}
}
}
}
}
struct BobParams { struct BobParams {
seed: Seed, seed: Seed,
db_path: PathBuf, db_path: PathBuf,
swap_id: Uuid,
bitcoin_wallet: Arc<bitcoin::Wallet>, bitcoin_wallet: Arc<bitcoin::Wallet>,
monero_wallet: Arc<monero::Wallet>, monero_wallet: Arc<monero::Wallet>,
alice_address: Multiaddr, alice_address: Multiaddr,
@ -58,12 +97,16 @@ struct BobParams {
} }
impl BobParams { impl BobParams {
pub async fn builder(&self, event_loop_handle: bob::EventLoopHandle) -> Result<bob::Builder> { pub async fn builder(
&self,
event_loop_handle: bob::EventLoopHandle,
swap_id: Uuid,
) -> Result<bob::Builder> {
let receive_address = self.monero_wallet.get_main_address(); let receive_address = self.monero_wallet.get_main_address();
Ok(bob::Builder::new( Ok(bob::Builder::new(
Database::open(&self.db_path.clone().as_path()).unwrap(), Database::open(&self.db_path.clone().as_path()).unwrap(),
self.swap_id, swap_id,
self.bitcoin_wallet.clone(), self.bitcoin_wallet.clone(),
self.monero_wallet.clone(), self.monero_wallet.clone(),
self.env_config, self.env_config,
@ -72,11 +115,16 @@ impl BobParams {
)) ))
} }
pub fn new_eventloop(&self) -> Result<(bob::EventLoop, bob::EventLoopHandle)> { pub fn new_eventloop(&self, swap_id: Uuid) -> Result<(bob::EventLoop, bob::EventLoopHandle)> {
let mut swarm = swarm::new::<bob::Behaviour>(&self.seed)?; let mut swarm = swarm::new::<bob::Behaviour>(&self.seed)?;
swarm.add_address(self.alice_peer_id, self.alice_address.clone()); swarm.add_address(self.alice_peer_id, self.alice_address.clone());
bob::EventLoop::new(swarm, self.alice_peer_id, self.bitcoin_wallet.clone()) bob::EventLoop::new(
swap_id,
swarm,
self.alice_peer_id,
self.bitcoin_wallet.clone(),
)
} }
} }
@ -139,24 +187,28 @@ impl TestContext {
} }
pub async fn alice_next_swap(&mut self) -> alice::Swap { pub async fn alice_next_swap(&mut self) -> alice::Swap {
timeout(Duration::from_secs(10), self.alice_swap_handle.recv()) timeout(Duration::from_secs(20), self.alice_swap_handle.recv())
.await .await
.expect("No Alice swap within 10 seconds, aborting because this test is waiting for a swap forever...") .expect("No Alice swap within 20 seconds, aborting because this test is likely waiting for a swap forever...")
.unwrap() .unwrap()
} }
pub async fn bob_swap(&mut self) -> (bob::Swap, BobApplicationHandle) { pub async fn bob_swap(&mut self) -> (bob::Swap, BobApplicationHandle) {
let (event_loop, event_loop_handle) = self.bob_params.new_eventloop().unwrap(); let swap_id = Uuid::new_v4();
let (event_loop, event_loop_handle) = self.bob_params.new_eventloop(swap_id).unwrap();
let swap = self let swap = self
.bob_params .bob_params
.builder(event_loop_handle) .builder(event_loop_handle, swap_id)
.await .await
.unwrap() .unwrap()
.with_init_params(self.btc_amount) .with_init_params(self.btc_amount)
.build() .build()
.unwrap(); .unwrap();
// ensure the wallet is up to date for concurrent swap tests
swap.bitcoin_wallet.sync().await.unwrap();
let join_handle = tokio::spawn(event_loop.run()); let join_handle = tokio::spawn(event_loop.run());
(swap, BobApplicationHandle(join_handle)) (swap, BobApplicationHandle(join_handle))
@ -165,14 +217,15 @@ impl TestContext {
pub async fn stop_and_resume_bob_from_db( pub async fn stop_and_resume_bob_from_db(
&mut self, &mut self,
join_handle: BobApplicationHandle, join_handle: BobApplicationHandle,
swap_id: Uuid,
) -> (bob::Swap, BobApplicationHandle) { ) -> (bob::Swap, BobApplicationHandle) {
join_handle.abort(); join_handle.abort();
let (event_loop, event_loop_handle) = self.bob_params.new_eventloop().unwrap(); let (event_loop, event_loop_handle) = self.bob_params.new_eventloop(swap_id).unwrap();
let swap = self let swap = self
.bob_params .bob_params
.builder(event_loop_handle) .builder(event_loop_handle, swap_id)
.await .await
.unwrap() .unwrap()
.build() .build()
@ -489,14 +542,13 @@ where
let env_config = C::get_config(); let env_config = C::get_config();
let (monero, containers) = harness::init_containers(&cli).await; let (monero, containers) = harness::init_containers(&cli).await;
monero.init_miner().await.unwrap();
let btc_amount = bitcoin::Amount::from_sat(1_000_000); let btc_amount = bitcoin::Amount::from_sat(1_000_000);
let xmr_amount = monero::Amount::from_monero(btc_amount.as_btc() / FixedRate::RATE).unwrap(); let xmr_amount = monero::Amount::from_monero(btc_amount.as_btc() / FixedRate::RATE).unwrap();
let alice_starting_balances = StartingBalances { let alice_starting_balances =
xmr: xmr_amount * 10, StartingBalances::new(bitcoin::Amount::ZERO, xmr_amount, Some(10));
btc: bitcoin::Amount::ZERO,
};
let electrs_rpc_port = containers let electrs_rpc_port = containers
.electrs .electrs
@ -532,10 +584,7 @@ where
); );
let bob_seed = Seed::random().unwrap(); let bob_seed = Seed::random().unwrap();
let bob_starting_balances = StartingBalances { let bob_starting_balances = StartingBalances::new(btc_amount * 10, monero::Amount::ZERO, None);
xmr: monero::Amount::ZERO,
btc: btc_amount * 10,
};
let (bob_bitcoin_wallet, bob_monero_wallet) = init_test_wallets( let (bob_bitcoin_wallet, bob_monero_wallet) = init_test_wallets(
MONERO_WALLET_NAME_BOB, MONERO_WALLET_NAME_BOB,
@ -552,7 +601,6 @@ where
let bob_params = BobParams { let bob_params = BobParams {
seed: Seed::random().unwrap(), seed: Seed::random().unwrap(),
db_path: tempdir().unwrap().path().to_path_buf(), db_path: tempdir().unwrap().path().to_path_buf(),
swap_id: Uuid::new_v4(),
bitcoin_wallet: bob_bitcoin_wallet.clone(), bitcoin_wallet: bob_bitcoin_wallet.clone(),
monero_wallet: bob_monero_wallet.clone(), monero_wallet: bob_monero_wallet.clone(),
alice_address: alice_listen_address.clone(), alice_address: alice_listen_address.clone(),
@ -560,6 +608,8 @@ where
env_config, env_config,
}; };
monero.start_miner().await.unwrap();
let test = TestContext { let test = TestContext {
env_config, env_config,
btc_amount, btc_amount,
@ -774,7 +824,14 @@ async fn init_test_wallets(
env_config: Config, env_config: Config,
) -> (Arc<bitcoin::Wallet>, Arc<monero::Wallet>) { ) -> (Arc<bitcoin::Wallet>, Arc<monero::Wallet>) {
monero monero
.init(vec![(name, starting_balances.xmr.as_piconero())]) .init_wallet(
name,
starting_balances
.xmr_outputs
.into_iter()
.map(|amount| amount.as_piconero())
.collect(),
)
.await .await
.unwrap(); .unwrap();

View File

@ -11,6 +11,7 @@ use swap::protocol::{alice, bob};
async fn alice_punishes_if_bob_never_acts_after_fund() { async fn alice_punishes_if_bob_never_acts_after_fund() {
harness::setup_test(FastPunishConfig, |mut ctx| async move { harness::setup_test(FastPunishConfig, |mut ctx| async move {
let (bob_swap, bob_join_handle) = ctx.bob_swap().await; let (bob_swap, bob_join_handle) = ctx.bob_swap().await;
let bob_swap_id = bob_swap.swap_id;
let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked)); let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked));
let alice_swap = ctx.alice_next_swap().await; let alice_swap = ctx.alice_next_swap().await;
@ -24,7 +25,9 @@ async fn alice_punishes_if_bob_never_acts_after_fund() {
// Restart Bob after Alice punished to ensure Bob transitions to // Restart Bob after Alice punished to ensure Bob transitions to
// punished and does not run indefinitely // punished and does not run indefinitely
let (bob_swap, _) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; let (bob_swap, _) = ctx
.stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id)
.await;
assert!(matches!(bob_swap.state, BobState::BtcLocked { .. })); assert!(matches!(bob_swap.state, BobState::BtcLocked { .. }));
let bob_state = bob::run(bob_swap).await?; let bob_state = bob::run(bob_swap).await?;