405: Concurrent swaps with same peer r=da-kami a=da-kami

Fixes #367 

- [x] Concurrent swaps with same peer

Not sure how much more time I should invest into this. We could just merge the current state and then do improvements on top...?

Improvements:

- [x] Think `// TODO: Remove unnecessary swap-id check` through and remove it
- [x] Add concurrent swap test, multiple swaps with same Bob
- [ ] Save swap messages without matching swap in execution in the database
- [ ] Assert the balances in the new concurrent swap tests
- [ ] ~~Add concurrent swap test, multiple swaps with different Bobs~~
- [ ] ~~Send swap-id in separate message, not on top of `Message0`~~

Co-authored-by: Daniel Karzel <daniel@comit.network>
This commit is contained in:
bors[bot] 2021-04-13 08:50:44 +00:00 committed by GitHub
commit 19766b9759
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
31 changed files with 456 additions and 127 deletions

View file

@ -116,7 +116,10 @@ jobs:
bob_refunds_using_cancel_and_refund_command_timelock_not_expired_force, bob_refunds_using_cancel_and_refund_command_timelock_not_expired_force,
punish, punish,
alice_punishes_after_restart_punish_timelock_expired, alice_punishes_after_restart_punish_timelock_expired,
alice_refunds_after_restart_bob_refunded alice_refunds_after_restart_bob_refunded,
ensure_same_swap_id,
concurrent_bobs_after_xmr_lock_proof_sent,
concurrent_bobs_before_xmr_lock_proof_sent
] ]
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:

View file

@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
### Changed
- Allow multiple concurrent swaps with the same peer on the ASB.
This is a breaking change because the swap ID is now agreed upon between CLI and ASB during swap setup.
Resuming swaps started prior to this change can result in unexpected behaviour.
## [0.4.0] - 2021-04-06 ## [0.4.0] - 2021-04-06
### Changed ### Changed

View file

@ -15,5 +15,8 @@ status = [
"docker_tests (bob_refunds_using_cancel_and_refund_command_timelock_not_expired)", "docker_tests (bob_refunds_using_cancel_and_refund_command_timelock_not_expired)",
"docker_tests (punish)", "docker_tests (punish)",
"docker_tests (alice_punishes_after_restart_punish_timelock_expired)", "docker_tests (alice_punishes_after_restart_punish_timelock_expired)",
"docker_tests (alice_refunds_after_restart_bob_refunded)" "docker_tests (alice_refunds_after_restart_bob_refunded)",
"docker_tests (ensure_same_swap_id)",
"docker_tests (concurrent_bobs_after_xmr_lock_proof_sent)",
"docker_tests (concurrent_bobs_before_xmr_lock_proof_sent)"
] ]

View file

@ -75,7 +75,7 @@ impl Image for Monero {
impl Default for Monero { impl Default for Monero {
fn default() -> Self { fn default() -> Self {
Monero { Monero {
tag: "v0.16.0.3".into(), tag: "v0.17.2.0".into(),
args: Args::default(), args: Args::default(),
entrypoint: Some("".into()), entrypoint: Some("".into()),
wait_for_message: "core RPC server started ok".to_string(), wait_for_message: "core RPC server started ok".to_string(),

View file

@ -68,13 +68,15 @@ impl<'c> Monero {
let miner = "miner"; let miner = "miner";
tracing::info!("Starting miner wallet: {}", miner); tracing::info!("Starting miner wallet: {}", miner);
let (miner_wallet, miner_container) = MoneroWalletRpc::new(cli, &miner, &monerod).await?; let (miner_wallet, miner_container) =
MoneroWalletRpc::new(cli, &miner, &monerod, prefix.clone()).await?;
wallets.push(miner_wallet); wallets.push(miner_wallet);
containers.push(miner_container); containers.push(miner_container);
for wallet in additional_wallets.iter() { for wallet in additional_wallets.iter() {
tracing::info!("Starting wallet: {}", wallet); tracing::info!("Starting wallet: {}", wallet);
let (wallet, container) = MoneroWalletRpc::new(cli, &wallet, &monerod).await?; let (wallet, container) =
MoneroWalletRpc::new(cli, &wallet, &monerod, prefix.clone()).await?;
wallets.push(wallet); wallets.push(wallet);
containers.push(container); containers.push(container);
} }
@ -96,7 +98,7 @@ impl<'c> Monero {
Ok(wallet) Ok(wallet)
} }
pub async fn init(&self, wallet_amount: Vec<(&str, u64)>) -> Result<()> { pub async fn init_miner(&self) -> Result<()> {
let miner_wallet = self.wallet("miner")?; let miner_wallet = self.wallet("miner")?;
let miner_address = miner_wallet.address().await?.address; let miner_address = miner_wallet.address().await?.address;
@ -106,17 +108,34 @@ impl<'c> Monero {
tracing::info!("Generated {:?} blocks", res.blocks.len()); tracing::info!("Generated {:?} blocks", res.blocks.len());
miner_wallet.refresh().await?; miner_wallet.refresh().await?;
for (wallet, amount) in wallet_amount.iter() { Ok(())
if *amount > 0 { }
let wallet = self.wallet(wallet)?;
let address = wallet.address().await?.address; pub async fn init_wallet(&self, name: &str, amount_in_outputs: Vec<u64>) -> Result<()> {
miner_wallet.transfer(&address, *amount).await?; let miner_wallet = self.wallet("miner")?;
let miner_address = miner_wallet.address().await?.address;
let monerod = &self.monerod;
let wallet = self.wallet(name)?;
let address = wallet.address().await?.address;
for amount in amount_in_outputs {
if amount > 0 {
miner_wallet.transfer(&address, amount).await?;
tracing::info!("Funded {} wallet with {}", wallet.name, amount); tracing::info!("Funded {} wallet with {}", wallet.name, amount);
monerod.client().generate_blocks(10, &miner_address).await?; monerod.client().generate_blocks(10, &miner_address).await?;
wallet.refresh().await?; wallet.refresh().await?;
} }
} }
Ok(())
}
pub async fn start_miner(&self) -> Result<()> {
let miner_wallet = self.wallet("miner")?;
let miner_address = miner_wallet.address().await?.address;
let monerod = &self.monerod;
monerod.start_miner(&miner_address).await?; monerod.start_miner(&miner_address).await?;
tracing::info!("Waiting for miner wallet to catch up..."); tracing::info!("Waiting for miner wallet to catch up...");
@ -128,6 +147,13 @@ impl<'c> Monero {
Ok(()) Ok(())
} }
pub async fn init_and_start_miner(&self) -> Result<()> {
self.init_miner().await?;
self.start_miner().await?;
Ok(())
}
} }
fn random_prefix() -> String { fn random_prefix() -> String {
@ -209,6 +235,7 @@ impl<'c> MoneroWalletRpc {
cli: &'c Cli, cli: &'c Cli,
name: &str, name: &str,
monerod: &Monerod, monerod: &Monerod,
prefix: String,
) -> Result<(Self, Container<'c, Cli, image::Monero>)> { ) -> Result<(Self, Container<'c, Cli, image::Monero>)> {
let wallet_rpc_port: u16 = let wallet_rpc_port: u16 =
port_check::free_local_port().ok_or_else(|| anyhow!("Could not retrieve free port"))?; port_check::free_local_port().ok_or_else(|| anyhow!("Could not retrieve free port"))?;
@ -218,7 +245,8 @@ impl<'c> MoneroWalletRpc {
let network = monerod.network.clone(); let network = monerod.network.clone();
let run_args = RunArgs::default() let run_args = RunArgs::default()
.with_name(name) // prefix the container name so we can run multiple tests
.with_name(format!("{}{}", prefix, name))
.with_network(network.clone()) .with_network(network.clone())
.with_mapped_port(Port { .with_mapped_port(Port {
local: wallet_rpc_port, local: wallet_rpc_port,

View file

@ -14,7 +14,7 @@ async fn init_miner_and_mine_to_miner_address() {
let tc = Cli::default(); let tc = Cli::default();
let (monero, _monerod_container) = Monero::new(&tc, vec![]).await.unwrap(); let (monero, _monerod_container) = Monero::new(&tc, vec![]).await.unwrap();
monero.init(vec![]).await.unwrap(); monero.init_and_start_miner().await.unwrap();
let monerod = monero.monerod(); let monerod = monero.monerod();
let miner_wallet = monero.wallet("miner").unwrap(); let miner_wallet = monero.wallet("miner").unwrap();

View file

@ -22,11 +22,10 @@ async fn fund_transfer_and_check_tx_key() {
let alice_wallet = monero.wallet("alice").unwrap(); let alice_wallet = monero.wallet("alice").unwrap();
let bob_wallet = monero.wallet("bob").unwrap(); let bob_wallet = monero.wallet("bob").unwrap();
// fund alice monero.init_miner().await.unwrap();
monero monero.init_wallet("alice", vec![fund_alice]).await.unwrap();
.init(vec![("alice", fund_alice), ("bob", fund_bob)]) monero.init_wallet("bob", vec![fund_bob]).await.unwrap();
.await monero.start_miner().await.unwrap();
.unwrap();
// check alice balance // check alice balance
let got_alice_balance = alice_wallet.balance().await.unwrap(); let got_alice_balance = alice_wallet.balance().await.unwrap();

View file

@ -279,10 +279,19 @@ impl Client {
.text() .text()
.await?; .await?;
debug!("transfer RPC response: {}", response); debug!("check_tx_key RPC response: {}", response);
let r = serde_json::from_str::<Response<CheckTxKey>>(&response)?; let check_tx_key = serde_json::from_str::<Response<CheckTxKey>>(&response)?;
Ok(r.result) let mut check_tx_key = check_tx_key.result;
// Due to a bug in monerod that causes check_tx_key confirmations
// to overflow we safeguard the confirmations to avoid unwanted
// side effects.
if check_tx_key.confirmations > u64::MAX - 1000 {
check_tx_key.confirmations = 0u64;
}
Ok(check_tx_key)
} }
pub async fn generate_from_keys( pub async fn generate_from_keys(

View file

@ -79,8 +79,9 @@ async fn main() -> Result<()> {
let mut swarm = swarm::new::<Behaviour>(&seed)?; let mut swarm = swarm::new::<Behaviour>(&seed)?;
swarm.add_address(alice_peer_id, alice_multiaddr); swarm.add_address(alice_peer_id, alice_multiaddr);
let swap_id = Uuid::new_v4();
let (event_loop, mut event_loop_handle) = let (event_loop, mut event_loop_handle) =
EventLoop::new(swarm, alice_peer_id, bitcoin_wallet.clone())?; EventLoop::new(swap_id, swarm, alice_peer_id, bitcoin_wallet.clone())?;
let event_loop = tokio::spawn(event_loop.run()); let event_loop = tokio::spawn(event_loop.run());
let send_bitcoin = determine_btc_to_swap( let send_bitcoin = determine_btc_to_swap(
@ -174,7 +175,7 @@ async fn main() -> Result<()> {
swarm.add_address(alice_peer_id, alice_multiaddr); swarm.add_address(alice_peer_id, alice_multiaddr);
let (event_loop, event_loop_handle) = let (event_loop, event_loop_handle) =
EventLoop::new(swarm, alice_peer_id, bitcoin_wallet.clone())?; EventLoop::new(swap_id, swarm, alice_peer_id, bitcoin_wallet.clone())?;
let handle = tokio::spawn(event_loop.run()); let handle = tokio::spawn(event_loop.run());
let swap = Builder::new( let swap = Builder::new(

View file

@ -5,6 +5,7 @@ use libp2p::request_response::{
RequestResponseMessage, RequestResponseMessage,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use uuid::Uuid;
pub type OutEvent = RequestResponseEvent<Request, ()>; pub type OutEvent = RequestResponseEvent<Request, ()>;
@ -19,6 +20,7 @@ impl ProtocolName for EncryptedSignatureProtocol {
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Request { pub struct Request {
pub swap_id: Uuid,
pub tx_redeem_encsig: crate::bitcoin::EncryptedSignature, pub tx_redeem_encsig: crate::bitcoin::EncryptedSignature,
} }

View file

@ -6,6 +6,7 @@ use libp2p::request_response::{
RequestResponseMessage, RequestResponseMessage,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use uuid::Uuid;
pub type OutEvent = RequestResponseEvent<Request, ()>; pub type OutEvent = RequestResponseEvent<Request, ()>;
@ -20,6 +21,7 @@ impl ProtocolName for TransferProofProtocol {
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Request { pub struct Request {
pub swap_id: Uuid,
pub tx_lock_proof: monero::TransferProof, pub tx_lock_proof: monero::TransferProof,
} }

View file

@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize};
use sha2::Sha256; use sha2::Sha256;
use sigma_fun::ext::dl_secp256k1_ed25519_eq::{CrossCurveDLEQ, CrossCurveDLEQProof}; use sigma_fun::ext::dl_secp256k1_ed25519_eq::{CrossCurveDLEQ, CrossCurveDLEQProof};
use sigma_fun::HashTranscript; use sigma_fun::HashTranscript;
use uuid::Uuid;
pub mod alice; pub mod alice;
pub mod bob; pub mod bob;
@ -18,14 +19,9 @@ pub static CROSS_CURVE_PROOF_SYSTEM: Lazy<
) )
}); });
#[derive(Debug, Copy, Clone)]
pub struct StartingBalances {
pub xmr: monero::Amount,
pub btc: bitcoin::Amount,
}
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Message0 { pub struct Message0 {
swap_id: Uuid,
B: bitcoin::PublicKey, B: bitcoin::PublicKey,
S_b_monero: monero::PublicKey, S_b_monero: monero::PublicKey,
S_b_bitcoin: bitcoin::PublicKey, S_b_bitcoin: bitcoin::PublicKey,

View file

@ -6,6 +6,7 @@ use libp2p::request_response::{
RequestId, RequestResponseEvent, RequestResponseMessage, ResponseChannel, RequestId, RequestResponseEvent, RequestResponseMessage, ResponseChannel,
}; };
use libp2p::{NetworkBehaviour, PeerId}; use libp2p::{NetworkBehaviour, PeerId};
use uuid::Uuid;
#[derive(Debug)] #[derive(Debug)]
pub enum OutEvent { pub enum OutEvent {
@ -20,6 +21,7 @@ pub enum OutEvent {
}, },
ExecutionSetupDone { ExecutionSetupDone {
bob_peer_id: PeerId, bob_peer_id: PeerId,
swap_id: Uuid,
state3: Box<State3>, state3: Box<State3>,
}, },
TransferProofAcknowledged { TransferProofAcknowledged {
@ -157,9 +159,11 @@ impl From<execution_setup::OutEvent> for OutEvent {
match event { match event {
Done { Done {
bob_peer_id, bob_peer_id,
swap_id,
state3, state3,
} => OutEvent::ExecutionSetupDone { } => OutEvent::ExecutionSetupDone {
bob_peer_id, bob_peer_id,
swap_id,
state3: Box::new(state3), state3: Box::new(state3),
}, },
Failure { peer, error } => OutEvent::Failure { peer, error }, Failure { peer, error } => OutEvent::Failure { peer, error },

View file

@ -3,7 +3,7 @@ use crate::database::Database;
use crate::env::Config; use crate::env::Config;
use crate::monero::BalanceTooLow; use crate::monero::BalanceTooLow;
use crate::network::quote::BidQuote; use crate::network::quote::BidQuote;
use crate::network::{encrypted_signature, spot_price, transfer_proof}; use crate::network::{spot_price, transfer_proof};
use crate::protocol::alice::{AliceState, Behaviour, OutEvent, State0, State3, Swap}; use crate::protocol::alice::{AliceState, Behaviour, OutEvent, State0, State3, Swap};
use crate::{bitcoin, kraken, monero}; use crate::{bitcoin, kraken, monero};
use anyhow::{bail, Context, Result}; use anyhow::{bail, Context, Result};
@ -43,16 +43,15 @@ pub struct EventLoop<RS> {
swap_sender: mpsc::Sender<Swap>, swap_sender: mpsc::Sender<Swap>,
/// Stores a sender per peer for incoming [`EncryptedSignature`]s. /// Stores incoming [`EncryptedSignature`]s per swap.
recv_encrypted_signature: recv_encrypted_signature: HashMap<Uuid, bmrng::RequestSender<bitcoin::EncryptedSignature, ()>>,
HashMap<PeerId, bmrng::RequestSender<encrypted_signature::Request, ()>>,
inflight_encrypted_signatures: FuturesUnordered<BoxFuture<'static, ResponseChannel<()>>>, inflight_encrypted_signatures: FuturesUnordered<BoxFuture<'static, ResponseChannel<()>>>,
send_transfer_proof: FuturesUnordered<OutgoingTransferProof>, send_transfer_proof: FuturesUnordered<OutgoingTransferProof>,
/// Tracks [`transfer_proof::Request`]s which could not yet be sent because /// Tracks [`transfer_proof::Request`]s which could not yet be sent because
/// we are currently disconnected from the peer. /// we are currently disconnected from the peer.
buffered_transfer_proofs: HashMap<PeerId, (transfer_proof::Request, bmrng::Responder<()>)>, buffered_transfer_proofs: HashMap<PeerId, Vec<(transfer_proof::Request, bmrng::Responder<()>)>>,
/// Tracks [`transfer_proof::Request`]s which are currently inflight and /// Tracks [`transfer_proof::Request`]s which are currently inflight and
/// awaiting an acknowledgement. /// awaiting an acknowledgement.
@ -120,7 +119,7 @@ where
} }
}; };
let handle = self.new_handle(peer_id); let handle = self.new_handle(peer_id, swap_id);
let swap = Swap { let swap = Swap {
event_loop_handle: handle, event_loop_handle: handle,
@ -186,25 +185,26 @@ where
tracing::debug!(%peer, "Failed to respond with quote"); tracing::debug!(%peer, "Failed to respond with quote");
} }
} }
SwarmEvent::Behaviour(OutEvent::ExecutionSetupDone{bob_peer_id, state3}) => { SwarmEvent::Behaviour(OutEvent::ExecutionSetupDone{bob_peer_id, swap_id, state3}) => {
let _ = self.handle_execution_setup_done(bob_peer_id, *state3).await; let _ = self.handle_execution_setup_done(bob_peer_id, swap_id, *state3).await;
} }
SwarmEvent::Behaviour(OutEvent::TransferProofAcknowledged { peer, id }) => { SwarmEvent::Behaviour(OutEvent::TransferProofAcknowledged { peer, id }) => {
tracing::trace!(%peer, "Bob acknowledged transfer proof"); tracing::debug!(%peer, "Bob acknowledged transfer proof");
if let Some(responder) = self.inflight_transfer_proofs.remove(&id) { if let Some(responder) = self.inflight_transfer_proofs.remove(&id) {
let _ = responder.respond(()); let _ = responder.respond(());
} }
} }
SwarmEvent::Behaviour(OutEvent::EncryptedSignatureReceived{ msg, channel, peer }) => { SwarmEvent::Behaviour(OutEvent::EncryptedSignatureReceived{ msg, channel, peer }) => {
let sender = match self.recv_encrypted_signature.remove(&peer) { let sender = match self.recv_encrypted_signature.remove(&msg.swap_id) {
Some(sender) => sender, Some(sender) => sender,
None => { None => {
// TODO: Don't just drop encsig if we currently don't have a running swap for it, save in db
tracing::warn!(%peer, "No sender for encrypted signature, maybe already handled?"); tracing::warn!(%peer, "No sender for encrypted signature, maybe already handled?");
continue; continue;
} }
}; };
let mut responder = match sender.send(*msg).await { let mut responder = match sender.send(msg.tx_redeem_encsig).await {
Ok(responder) => responder, Ok(responder) => responder,
Err(_) => { Err(_) => {
tracing::warn!(%peer, "Failed to relay encrypted signature to swap"); tracing::warn!(%peer, "Failed to relay encrypted signature to swap");
@ -225,11 +225,13 @@ where
SwarmEvent::ConnectionEstablished { peer_id: peer, endpoint, .. } => { SwarmEvent::ConnectionEstablished { peer_id: peer, endpoint, .. } => {
tracing::debug!(%peer, address = %endpoint.get_remote_address(), "New connection established"); tracing::debug!(%peer, address = %endpoint.get_remote_address(), "New connection established");
if let Some((transfer_proof, responder)) = self.buffered_transfer_proofs.remove(&peer) { if let Some(transfer_proofs) = self.buffered_transfer_proofs.remove(&peer) {
tracing::debug!(%peer, "Found buffered transfer proof for peer"); for (transfer_proof, responder) in transfer_proofs {
tracing::debug!(%peer, "Found buffered transfer proof for peer");
let id = self.swarm.transfer_proof.send_request(&peer, transfer_proof); let id = self.swarm.transfer_proof.send_request(&peer, transfer_proof);
self.inflight_transfer_proofs.insert(id, responder); self.inflight_transfer_proofs.insert(id, responder);
}
} }
} }
SwarmEvent::IncomingConnectionError { send_back_addr: address, error, .. } => { SwarmEvent::IncomingConnectionError { send_back_addr: address, error, .. } => {
@ -253,15 +255,15 @@ where
Some(Ok((peer, transfer_proof, responder))) => { Some(Ok((peer, transfer_proof, responder))) => {
if !self.swarm.transfer_proof.is_connected(&peer) { if !self.swarm.transfer_proof.is_connected(&peer) {
tracing::warn!(%peer, "No active connection to peer, buffering transfer proof"); tracing::warn!(%peer, "No active connection to peer, buffering transfer proof");
self.buffered_transfer_proofs.insert(peer, (transfer_proof, responder)); self.buffered_transfer_proofs.entry(peer).or_insert_with(Vec::new).push((transfer_proof, responder));
continue; continue;
} }
let id = self.swarm.transfer_proof.send_request(&peer, transfer_proof); let id = self.swarm.transfer_proof.send_request(&peer, transfer_proof);
self.inflight_transfer_proofs.insert(id, responder); self.inflight_transfer_proofs.insert(id, responder);
}, },
Some(Err(_)) => { Some(Err(e)) => {
tracing::debug!("A swap stopped without sending a transfer proof"); tracing::debug!("A swap stopped without sending a transfer proof: {:#}", e);
} }
None => { None => {
unreachable!("stream of transfer proof receivers must never terminate") unreachable!("stream of transfer proof receivers must never terminate")
@ -317,9 +319,13 @@ where
}) })
} }
async fn handle_execution_setup_done(&mut self, bob_peer_id: PeerId, state3: State3) { async fn handle_execution_setup_done(
let swap_id = Uuid::new_v4(); &mut self,
let handle = self.new_handle(bob_peer_id); bob_peer_id: PeerId,
swap_id: Uuid,
state3: State3,
) {
let handle = self.new_handle(bob_peer_id, swap_id);
let initial_state = AliceState::Started { let initial_state = AliceState::Started {
state3: Box::new(state3), state3: Box::new(state3),
@ -335,7 +341,7 @@ where
swap_id, swap_id,
}; };
// TODO: Consider adding separate components for start/rsume of swaps // TODO: Consider adding separate components for start/resume of swaps
// swaps save peer id so we can resume // swaps save peer id so we can resume
match self.db.insert_peer_id(swap_id, bob_peer_id).await { match self.db.insert_peer_id(swap_id, bob_peer_id).await {
@ -352,17 +358,24 @@ where
/// Create a new [`EventLoopHandle`] that is scoped for communication with /// Create a new [`EventLoopHandle`] that is scoped for communication with
/// the given peer. /// the given peer.
fn new_handle(&mut self, peer: PeerId) -> EventLoopHandle { fn new_handle(&mut self, peer: PeerId, swap_id: Uuid) -> EventLoopHandle {
// we deliberately don't put timeouts on these channels because the swap always // we deliberately don't put timeouts on these channels because the swap always
// races these futures against a timelock // races these futures against a timelock
let (transfer_proof_sender, mut transfer_proof_receiver) = bmrng::channel(1); let (transfer_proof_sender, mut transfer_proof_receiver) = bmrng::channel(1);
let encrypted_signature = bmrng::channel(1); let encrypted_signature = bmrng::channel(1);
self.recv_encrypted_signature self.recv_encrypted_signature
.insert(peer, encrypted_signature.0); .insert(swap_id, encrypted_signature.0);
self.send_transfer_proof.push( self.send_transfer_proof.push(
async move { async move {
let (request, responder) = transfer_proof_receiver.recv().await?; let (transfer_proof, responder) = transfer_proof_receiver.recv().await?;
let request = transfer_proof::Request {
swap_id,
tx_lock_proof: transfer_proof,
};
Ok((peer, request, responder)) Ok((peer, request, responder))
} }
@ -440,13 +453,13 @@ impl LatestRate for KrakenRate {
#[derive(Debug)] #[derive(Debug)]
pub struct EventLoopHandle { pub struct EventLoopHandle {
recv_encrypted_signature: Option<bmrng::RequestReceiver<encrypted_signature::Request, ()>>, recv_encrypted_signature: Option<bmrng::RequestReceiver<bitcoin::EncryptedSignature, ()>>,
send_transfer_proof: Option<bmrng::RequestSender<transfer_proof::Request, ()>>, send_transfer_proof: Option<bmrng::RequestSender<monero::TransferProof, ()>>,
} }
impl EventLoopHandle { impl EventLoopHandle {
pub async fn recv_encrypted_signature(&mut self) -> Result<bitcoin::EncryptedSignature> { pub async fn recv_encrypted_signature(&mut self) -> Result<bitcoin::EncryptedSignature> {
let (request, responder) = self let (tx_redeem_encsig, responder) = self
.recv_encrypted_signature .recv_encrypted_signature
.take() .take()
.context("Encrypted signature was already received")? .context("Encrypted signature was already received")?
@ -457,14 +470,14 @@ impl EventLoopHandle {
.respond(()) .respond(())
.context("Failed to acknowledge receipt of encrypted signature")?; .context("Failed to acknowledge receipt of encrypted signature")?;
Ok(request.tx_redeem_encsig) Ok(tx_redeem_encsig)
} }
pub async fn send_transfer_proof(&mut self, msg: monero::TransferProof) -> Result<()> { pub async fn send_transfer_proof(&mut self, msg: monero::TransferProof) -> Result<()> {
self.send_transfer_proof self.send_transfer_proof
.take() .take()
.context("Transfer proof was already sent")? .context("Transfer proof was already sent")?
.send_receive(transfer_proof::Request { tx_lock_proof: msg }) .send_receive(msg)
.await .await
.context("Failed to send transfer proof")?; .context("Failed to send transfer proof")?;

View file

@ -4,18 +4,27 @@ use crate::protocol::{Message0, Message2, Message4};
use anyhow::{Context, Error}; use anyhow::{Context, Error};
use libp2p::PeerId; use libp2p::PeerId;
use libp2p_async_await::BehaviourOutEvent; use libp2p_async_await::BehaviourOutEvent;
use uuid::Uuid;
#[derive(Debug)] #[derive(Debug)]
pub enum OutEvent { pub enum OutEvent {
Done { bob_peer_id: PeerId, state3: State3 }, Done {
Failure { peer: PeerId, error: Error }, bob_peer_id: PeerId,
swap_id: Uuid,
state3: State3,
},
Failure {
peer: PeerId,
error: Error,
},
} }
impl From<BehaviourOutEvent<(PeerId, State3), (), Error>> for OutEvent { impl From<BehaviourOutEvent<(PeerId, (Uuid, State3)), (), Error>> for OutEvent {
fn from(event: BehaviourOutEvent<(PeerId, State3), (), Error>) -> Self { fn from(event: BehaviourOutEvent<(PeerId, (Uuid, State3)), (), Error>) -> Self {
match event { match event {
BehaviourOutEvent::Inbound(_, Ok((bob_peer_id, state3))) => OutEvent::Done { BehaviourOutEvent::Inbound(_, Ok((bob_peer_id, (swap_id, state3)))) => OutEvent::Done {
bob_peer_id, bob_peer_id,
swap_id,
state3, state3,
}, },
BehaviourOutEvent::Inbound(peer, Err(e)) => OutEvent::Failure { peer, error: e }, BehaviourOutEvent::Inbound(peer, Err(e)) => OutEvent::Failure { peer, error: e },
@ -27,7 +36,7 @@ impl From<BehaviourOutEvent<(PeerId, State3), (), Error>> for OutEvent {
#[derive(libp2p::NetworkBehaviour)] #[derive(libp2p::NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", event_process = false)] #[behaviour(out_event = "OutEvent", event_process = false)]
pub struct Behaviour { pub struct Behaviour {
inner: libp2p_async_await::Behaviour<(PeerId, State3), (), anyhow::Error>, inner: libp2p_async_await::Behaviour<(PeerId, (Uuid, State3)), (), anyhow::Error>,
} }
impl Default for Behaviour { impl Default for Behaviour {
@ -45,7 +54,7 @@ impl Behaviour {
let message0 = let message0 =
serde_cbor::from_slice::<Message0>(&substream.read_message(BUF_SIZE).await?) serde_cbor::from_slice::<Message0>(&substream.read_message(BUF_SIZE).await?)
.context("Failed to deserialize message0")?; .context("Failed to deserialize message0")?;
let state1 = state0.receive(message0)?; let (swap_id, state1) = state0.receive(message0)?;
substream substream
.write_message( .write_message(
@ -73,7 +82,7 @@ impl Behaviour {
.context("Failed to deserialize message4")?; .context("Failed to deserialize message4")?;
let state3 = state2.receive(message4)?; let state3 = state2.receive(message4)?;
Ok((bob, state3)) Ok((bob, (swap_id, state3)))
}) })
} }
} }

View file

@ -13,6 +13,7 @@ use rand::{CryptoRng, RngCore};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sigma_fun::ext::dl_secp256k1_ed25519_eq::CrossCurveDLEQProof; use sigma_fun::ext::dl_secp256k1_ed25519_eq::CrossCurveDLEQProof;
use std::fmt; use std::fmt;
use uuid::Uuid;
#[derive(Debug)] #[derive(Debug)]
pub enum AliceState { pub enum AliceState {
@ -146,7 +147,7 @@ impl State0 {
}) })
} }
pub fn receive(self, msg: Message0) -> Result<State1> { pub fn receive(self, msg: Message0) -> Result<(Uuid, State1)> {
let valid = CROSS_CURVE_PROOF_SYSTEM.verify( let valid = CROSS_CURVE_PROOF_SYSTEM.verify(
&msg.dleq_proof_s_b, &msg.dleq_proof_s_b,
( (
@ -164,7 +165,7 @@ impl State0 {
let v = self.v_a + msg.v_b; let v = self.v_a + msg.v_b;
Ok(State1 { Ok((msg.swap_id, State1 {
a: self.a, a: self.a,
B: msg.B, B: msg.B,
s_a: self.s_a, s_a: self.s_a,
@ -182,7 +183,7 @@ impl State0 {
refund_address: msg.refund_address, refund_address: msg.refund_address,
redeem_address: self.redeem_address, redeem_address: self.redeem_address,
punish_address: self.punish_address, punish_address: self.punish_address,
}) }))
} }
} }

View file

@ -5,7 +5,6 @@ use crate::env::Config;
use crate::protocol::alice; use crate::protocol::alice;
use crate::protocol::alice::event_loop::EventLoopHandle; use crate::protocol::alice::event_loop::EventLoopHandle;
use crate::protocol::alice::AliceState; use crate::protocol::alice::AliceState;
use crate::protocol::alice::AliceState::XmrLockTransferProofSent;
use crate::{bitcoin, database, monero}; use crate::{bitcoin, database, monero};
use anyhow::{bail, Context, Result}; use anyhow::{bail, Context, Result};
use tokio::select; use tokio::select;
@ -17,7 +16,7 @@ pub async fn run(swap: alice::Swap) -> Result<AliceState> {
run_until(swap, |_| false).await run_until(swap, |_| false).await
} }
#[tracing::instrument(name = "swap", skip(swap,exit_early), fields(id = %swap.swap_id))] #[tracing::instrument(name = "swap", skip(swap,exit_early), fields(id = %swap.swap_id), err)]
pub async fn run_until( pub async fn run_until(
mut swap: alice::Swap, mut swap: alice::Swap,
exit_early: fn(&AliceState) -> bool, exit_early: fn(&AliceState) -> bool,
@ -130,7 +129,7 @@ async fn next_state(
result = event_loop_handle.send_transfer_proof(transfer_proof.clone()) => { result = event_loop_handle.send_transfer_proof(transfer_proof.clone()) => {
result?; result?;
XmrLockTransferProofSent { AliceState::XmrLockTransferProofSent {
monero_wallet_restore_blockheight, monero_wallet_restore_blockheight,
transfer_proof, transfer_proof,
state3, state3,

View file

@ -1,6 +1,6 @@
use crate::bitcoin::EncryptedSignature; use crate::bitcoin::EncryptedSignature;
use crate::network::quote::BidQuote; use crate::network::quote::BidQuote;
use crate::network::{encrypted_signature, spot_price, transfer_proof}; use crate::network::{encrypted_signature, spot_price};
use crate::protocol::bob::{Behaviour, OutEvent, State0, State2}; use crate::protocol::bob::{Behaviour, OutEvent, State0, State2};
use crate::{bitcoin, monero}; use crate::{bitcoin, monero};
use anyhow::{Context, Result}; use anyhow::{Context, Result};
@ -12,9 +12,11 @@ use libp2p::{PeerId, Swarm};
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use uuid::Uuid;
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct EventLoop { pub struct EventLoop {
swap_id: Uuid,
swarm: libp2p::Swarm<Behaviour>, swarm: libp2p::Swarm<Behaviour>,
bitcoin_wallet: Arc<bitcoin::Wallet>, bitcoin_wallet: Arc<bitcoin::Wallet>,
alice_peer_id: PeerId, alice_peer_id: PeerId,
@ -22,7 +24,7 @@ pub struct EventLoop {
// these streams represents outgoing requests that we have to make // these streams represents outgoing requests that we have to make
quote_requests: bmrng::RequestReceiverStream<(), BidQuote>, quote_requests: bmrng::RequestReceiverStream<(), BidQuote>,
spot_price_requests: bmrng::RequestReceiverStream<spot_price::Request, spot_price::Response>, spot_price_requests: bmrng::RequestReceiverStream<spot_price::Request, spot_price::Response>,
encrypted_signature_requests: bmrng::RequestReceiverStream<encrypted_signature::Request, ()>, encrypted_signatures: bmrng::RequestReceiverStream<EncryptedSignature, ()>,
execution_setup_requests: bmrng::RequestReceiverStream<State0, Result<State2>>, execution_setup_requests: bmrng::RequestReceiverStream<State0, Result<State2>>,
// these represents requests that are currently in-flight. // these represents requests that are currently in-flight.
@ -34,7 +36,7 @@ pub struct EventLoop {
inflight_execution_setup: Option<bmrng::Responder<Result<State2>>>, inflight_execution_setup: Option<bmrng::Responder<Result<State2>>>,
/// The sender we will use to relay incoming transfer proofs. /// The sender we will use to relay incoming transfer proofs.
transfer_proof: bmrng::RequestSender<transfer_proof::Request, ()>, transfer_proof: bmrng::RequestSender<monero::TransferProof, ()>,
/// The future representing the successful handling of an incoming transfer /// The future representing the successful handling of an incoming transfer
/// proof. /// proof.
/// ///
@ -47,6 +49,7 @@ pub struct EventLoop {
impl EventLoop { impl EventLoop {
pub fn new( pub fn new(
swap_id: Uuid,
swarm: Swarm<Behaviour>, swarm: Swarm<Behaviour>,
alice_peer_id: PeerId, alice_peer_id: PeerId,
bitcoin_wallet: Arc<bitcoin::Wallet>, bitcoin_wallet: Arc<bitcoin::Wallet>,
@ -58,12 +61,13 @@ impl EventLoop {
let quote = bmrng::channel_with_timeout(1, Duration::from_secs(30)); let quote = bmrng::channel_with_timeout(1, Duration::from_secs(30));
let event_loop = EventLoop { let event_loop = EventLoop {
swap_id,
swarm, swarm,
alice_peer_id, alice_peer_id,
bitcoin_wallet, bitcoin_wallet,
execution_setup_requests: execution_setup.1.into(), execution_setup_requests: execution_setup.1.into(),
transfer_proof: transfer_proof.0, transfer_proof: transfer_proof.0,
encrypted_signature_requests: encrypted_signature.1.into(), encrypted_signatures: encrypted_signature.1.into(),
spot_price_requests: spot_price.1.into(), spot_price_requests: spot_price.1.into(),
quote_requests: quote.1.into(), quote_requests: quote.1.into(),
inflight_spot_price_requests: HashMap::default(), inflight_spot_price_requests: HashMap::default(),
@ -108,10 +112,20 @@ impl EventLoop {
} }
} }
SwarmEvent::Behaviour(OutEvent::TransferProofReceived { msg, channel }) => { SwarmEvent::Behaviour(OutEvent::TransferProofReceived { msg, channel }) => {
let mut responder = match self.transfer_proof.send(*msg).await { if msg.swap_id != self.swap_id {
// TODO: Save unexpected transfer proofs in the database and check for messages in the database when handling swaps
tracing::warn!("Received unexpected transfer proof for swap {} while running swap {}. This transfer proof will be ignored.", msg.swap_id, self.swap_id);
// When receiving a transfer proof that is unexpected we still have to acknowledge that it was received
let _ = self.swarm.transfer_proof.send_response(channel, ());
continue;
}
let mut responder = match self.transfer_proof.send(msg.tx_lock_proof).await {
Ok(responder) => responder, Ok(responder) => responder,
Err(_) => { Err(e) => {
tracing::warn!("Failed to pass on transfer proof"); tracing::warn!("Failed to pass on transfer proof: {:#}", e);
continue; continue;
} }
}; };
@ -180,7 +194,12 @@ impl EventLoop {
self.swarm.execution_setup.run(self.alice_peer_id, request, self.bitcoin_wallet.clone()); self.swarm.execution_setup.run(self.alice_peer_id, request, self.bitcoin_wallet.clone());
self.inflight_execution_setup = Some(responder); self.inflight_execution_setup = Some(responder);
}, },
Some((request, responder)) = self.encrypted_signature_requests.next().fuse(), if self.is_connected_to_alice() => { Some((tx_redeem_encsig, responder)) = self.encrypted_signatures.next().fuse(), if self.is_connected_to_alice() => {
let request = encrypted_signature::Request {
swap_id: self.swap_id,
tx_redeem_encsig
};
let id = self.swarm.encrypted_signature.send_request(&self.alice_peer_id, request); let id = self.swarm.encrypted_signature.send_request(&self.alice_peer_id, request);
self.inflight_encrypted_signature_requests.insert(id, responder); self.inflight_encrypted_signature_requests.insert(id, responder);
}, },
@ -202,8 +221,8 @@ impl EventLoop {
#[derive(Debug)] #[derive(Debug)]
pub struct EventLoopHandle { pub struct EventLoopHandle {
execution_setup: bmrng::RequestSender<State0, Result<State2>>, execution_setup: bmrng::RequestSender<State0, Result<State2>>,
transfer_proof: bmrng::RequestReceiver<transfer_proof::Request, ()>, transfer_proof: bmrng::RequestReceiver<monero::TransferProof, ()>,
encrypted_signature: bmrng::RequestSender<encrypted_signature::Request, ()>, encrypted_signature: bmrng::RequestSender<EncryptedSignature, ()>,
spot_price: bmrng::RequestSender<spot_price::Request, spot_price::Response>, spot_price: bmrng::RequestSender<spot_price::Request, spot_price::Response>,
quote: bmrng::RequestSender<(), BidQuote>, quote: bmrng::RequestSender<(), BidQuote>,
} }
@ -213,8 +232,8 @@ impl EventLoopHandle {
self.execution_setup.send_receive(state0).await? self.execution_setup.send_receive(state0).await?
} }
pub async fn recv_transfer_proof(&mut self) -> Result<transfer_proof::Request> { pub async fn recv_transfer_proof(&mut self) -> Result<monero::TransferProof> {
let (request, responder) = self let (transfer_proof, responder) = self
.transfer_proof .transfer_proof
.recv() .recv()
.await .await
@ -223,7 +242,7 @@ impl EventLoopHandle {
.respond(()) .respond(())
.context("Failed to acknowledge receipt of transfer proof")?; .context("Failed to acknowledge receipt of transfer proof")?;
Ok(request) Ok(transfer_proof)
} }
pub async fn request_spot_price(&mut self, btc: bitcoin::Amount) -> Result<monero::Amount> { pub async fn request_spot_price(&mut self, btc: bitcoin::Amount) -> Result<monero::Amount> {
@ -244,7 +263,7 @@ impl EventLoopHandle {
) -> Result<()> { ) -> Result<()> {
Ok(self Ok(self
.encrypted_signature .encrypted_signature
.send_receive(encrypted_signature::Request { tx_redeem_encsig }) .send_receive(tx_redeem_encsig)
.await?) .await?)
} }
} }

View file

@ -17,6 +17,7 @@ use serde::{Deserialize, Serialize};
use sha2::Sha256; use sha2::Sha256;
use sigma_fun::ext::dl_secp256k1_ed25519_eq::CrossCurveDLEQProof; use sigma_fun::ext::dl_secp256k1_ed25519_eq::CrossCurveDLEQProof;
use std::fmt; use std::fmt;
use uuid::Uuid;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum BobState { pub enum BobState {
@ -69,6 +70,7 @@ impl fmt::Display for BobState {
#[derive(Clone, Debug, PartialEq)] #[derive(Clone, Debug, PartialEq)]
pub struct State0 { pub struct State0 {
swap_id: Uuid,
b: bitcoin::SecretKey, b: bitcoin::SecretKey,
s_b: monero::Scalar, s_b: monero::Scalar,
S_b_monero: monero::PublicKey, S_b_monero: monero::PublicKey,
@ -84,7 +86,9 @@ pub struct State0 {
} }
impl State0 { impl State0 {
#[allow(clippy::too_many_arguments)]
pub fn new<R: RngCore + CryptoRng>( pub fn new<R: RngCore + CryptoRng>(
swap_id: Uuid,
rng: &mut R, rng: &mut R,
btc: bitcoin::Amount, btc: bitcoin::Amount,
xmr: monero::Amount, xmr: monero::Amount,
@ -101,6 +105,7 @@ impl State0 {
let (dleq_proof_s_b, (S_b_bitcoin, S_b_monero)) = CROSS_CURVE_PROOF_SYSTEM.prove(&s_b, rng); let (dleq_proof_s_b, (S_b_bitcoin, S_b_monero)) = CROSS_CURVE_PROOF_SYSTEM.prove(&s_b, rng);
Self { Self {
swap_id,
b, b,
s_b, s_b,
v_b, v_b,
@ -120,6 +125,7 @@ impl State0 {
pub fn next_message(&self) -> Message0 { pub fn next_message(&self) -> Message0 {
Message0 { Message0 {
swap_id: self.swap_id,
B: self.b.public(), B: self.b.public(),
S_b_monero: self.S_b_monero, S_b_monero: self.S_b_monero,
S_b_bitcoin: self.S_b_bitcoin, S_b_bitcoin: self.S_b_bitcoin,

View file

@ -68,6 +68,7 @@ async fn next_state(
let bitcoin_refund_address = bitcoin_wallet.new_address().await?; let bitcoin_refund_address = bitcoin_wallet.new_address().await?;
let state2 = request_price_and_setup( let state2 = request_price_and_setup(
swap_id,
btc_amount, btc_amount,
event_loop_handle, event_loop_handle,
env_config, env_config,
@ -106,7 +107,7 @@ async fn next_state(
select! { select! {
transfer_proof = transfer_proof_watcher => { transfer_proof = transfer_proof_watcher => {
let transfer_proof = transfer_proof?.tx_lock_proof; let transfer_proof = transfer_proof?;
tracing::info!(txid = %transfer_proof.tx_hash(), "Alice locked Monero"); tracing::info!(txid = %transfer_proof.tx_hash(), "Alice locked Monero");
@ -259,6 +260,7 @@ async fn next_state(
} }
pub async fn request_price_and_setup( pub async fn request_price_and_setup(
swap_id: Uuid,
btc: bitcoin::Amount, btc: bitcoin::Amount,
event_loop_handle: &mut EventLoopHandle, event_loop_handle: &mut EventLoopHandle,
env_config: &Config, env_config: &Config,
@ -269,6 +271,7 @@ pub async fn request_price_and_setup(
tracing::info!("Spot price for {} is {}", btc, xmr); tracing::info!("Spot price for {} is {}", btc, xmr);
let state0 = State0::new( let state0 = State0::new(
swap_id,
&mut OsRng, &mut OsRng,
btc, btc,
xmr, xmr,

View file

@ -13,6 +13,7 @@ use swap::protocol::{alice, bob};
async fn alice_punishes_after_restart_if_punish_timelock_expired() { async fn alice_punishes_after_restart_if_punish_timelock_expired() {
harness::setup_test(FastPunishConfig, |mut ctx| async move { harness::setup_test(FastPunishConfig, |mut ctx| async move {
let (bob_swap, bob_join_handle) = ctx.bob_swap().await; let (bob_swap, bob_join_handle) = ctx.bob_swap().await;
let bob_swap_id = bob_swap.swap_id;
let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked)); let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked));
let alice_swap = ctx.alice_next_swap().await; let alice_swap = ctx.alice_next_swap().await;
@ -33,11 +34,7 @@ async fn alice_punishes_after_restart_if_punish_timelock_expired() {
.wait_until_confirmed_with(state3.punish_timelock) .wait_until_confirmed_with(state3.punish_timelock)
.await?; .await?;
} else { } else {
panic!( panic!("Alice in unexpected state {}", alice_state);
"\
Alice in unexpected state {}",
alice_state
);
} }
ctx.restart_alice().await; ctx.restart_alice().await;
@ -49,7 +46,9 @@ async fn alice_punishes_after_restart_if_punish_timelock_expired() {
// Restart Bob after Alice punished to ensure Bob transitions to // Restart Bob after Alice punished to ensure Bob transitions to
// punished and does not run indefinitely // punished and does not run indefinitely
let (bob_swap, _) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; let (bob_swap, _) = ctx
.stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id)
.await;
assert!(matches!(bob_swap.state, BobState::BtcLocked { .. })); assert!(matches!(bob_swap.state, BobState::BtcLocked { .. }));
let bob_state = bob::run(bob_swap).await?; let bob_state = bob::run(bob_swap).await?;

View file

@ -9,6 +9,7 @@ use swap::protocol::{alice, bob};
async fn given_bob_manually_refunds_after_btc_locked_bob_refunds() { async fn given_bob_manually_refunds_after_btc_locked_bob_refunds() {
harness::setup_test(FastCancelConfig, |mut ctx| async move { harness::setup_test(FastCancelConfig, |mut ctx| async move {
let (bob_swap, bob_join_handle) = ctx.bob_swap().await; let (bob_swap, bob_join_handle) = ctx.bob_swap().await;
let bob_swap_id = bob_swap.swap_id;
let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked)); let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked));
let alice_swap = ctx.alice_next_swap().await; let alice_swap = ctx.alice_next_swap().await;
@ -17,7 +18,9 @@ async fn given_bob_manually_refunds_after_btc_locked_bob_refunds() {
let bob_state = bob_swap.await??; let bob_state = bob_swap.await??;
assert!(matches!(bob_state, BobState::BtcLocked { .. })); assert!(matches!(bob_state, BobState::BtcLocked { .. }));
let (bob_swap, bob_join_handle) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; let (bob_swap, bob_join_handle) = ctx
.stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id)
.await;
// Ensure cancel timelock is expired // Ensure cancel timelock is expired
if let BobState::BtcLocked(state3) = bob_swap.state.clone() { if let BobState::BtcLocked(state3) = bob_swap.state.clone() {
@ -43,7 +46,9 @@ async fn given_bob_manually_refunds_after_btc_locked_bob_refunds() {
.await??; .await??;
assert!(matches!(state, BobState::BtcCancelled { .. })); assert!(matches!(state, BobState::BtcCancelled { .. }));
let (bob_swap, bob_join_handle) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; let (bob_swap, bob_join_handle) = ctx
.stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id)
.await;
assert!(matches!(bob_swap.state, BobState::BtcCancelled { .. })); assert!(matches!(bob_swap.state, BobState::BtcCancelled { .. }));
// Bob manually refunds // Bob manually refunds

View file

@ -10,6 +10,7 @@ use swap::protocol::{alice, bob};
async fn given_bob_manually_cancels_when_timelock_not_expired_errors() { async fn given_bob_manually_cancels_when_timelock_not_expired_errors() {
harness::setup_test(SlowCancelConfig, |mut ctx| async move { harness::setup_test(SlowCancelConfig, |mut ctx| async move {
let (bob_swap, bob_join_handle) = ctx.bob_swap().await; let (bob_swap, bob_join_handle) = ctx.bob_swap().await;
let bob_swap_id = bob_swap.swap_id;
let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked)); let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked));
let alice_swap = ctx.alice_next_swap().await; let alice_swap = ctx.alice_next_swap().await;
@ -18,7 +19,9 @@ async fn given_bob_manually_cancels_when_timelock_not_expired_errors() {
let bob_state = bob_swap.await??; let bob_state = bob_swap.await??;
assert!(matches!(bob_state, BobState::BtcLocked { .. })); assert!(matches!(bob_state, BobState::BtcLocked { .. }));
let (bob_swap, bob_join_handle) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; let (bob_swap, bob_join_handle) = ctx
.stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id)
.await;
assert!(matches!(bob_swap.state, BobState::BtcLocked { .. })); assert!(matches!(bob_swap.state, BobState::BtcLocked { .. }));
// Bob tries but fails to manually cancel // Bob tries but fails to manually cancel
@ -35,7 +38,9 @@ async fn given_bob_manually_cancels_when_timelock_not_expired_errors() {
assert!(matches!(result, Error::CancelTimelockNotExpiredYet)); assert!(matches!(result, Error::CancelTimelockNotExpiredYet));
let (bob_swap, bob_join_handle) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; let (bob_swap, bob_join_handle) = ctx
.stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id)
.await;
assert!(matches!(bob_swap.state, BobState::BtcLocked { .. })); assert!(matches!(bob_swap.state, BobState::BtcLocked { .. }));
// Bob tries but fails to manually refund // Bob tries but fails to manually refund
@ -50,7 +55,9 @@ async fn given_bob_manually_cancels_when_timelock_not_expired_errors() {
.err() .err()
.unwrap(); .unwrap();
let (bob_swap, _) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; let (bob_swap, _) = ctx
.stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id)
.await;
assert!(matches!(bob_swap.state, BobState::BtcLocked { .. })); assert!(matches!(bob_swap.state, BobState::BtcLocked { .. }));
Ok(()) Ok(())

View file

@ -9,6 +9,7 @@ use swap::protocol::{alice, bob};
async fn given_bob_manually_forces_cancel_when_timelock_not_expired_errors() { async fn given_bob_manually_forces_cancel_when_timelock_not_expired_errors() {
harness::setup_test(SlowCancelConfig, |mut ctx| async move { harness::setup_test(SlowCancelConfig, |mut ctx| async move {
let (bob_swap, bob_join_handle) = ctx.bob_swap().await; let (bob_swap, bob_join_handle) = ctx.bob_swap().await;
let bob_swap_id = bob_swap.swap_id;
let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked)); let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked));
let alice_swap = ctx.alice_next_swap().await; let alice_swap = ctx.alice_next_swap().await;
@ -17,7 +18,9 @@ async fn given_bob_manually_forces_cancel_when_timelock_not_expired_errors() {
let bob_state = bob_swap.await??; let bob_state = bob_swap.await??;
assert!(matches!(bob_state, BobState::BtcLocked { .. })); assert!(matches!(bob_state, BobState::BtcLocked { .. }));
let (bob_swap, bob_join_handle) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; let (bob_swap, bob_join_handle) = ctx
.stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id)
.await;
assert!(matches!(bob_swap.state, BobState::BtcLocked { .. })); assert!(matches!(bob_swap.state, BobState::BtcLocked { .. }));
// Bob forces a cancel that will fail // Bob forces a cancel that will fail
@ -33,7 +36,9 @@ async fn given_bob_manually_forces_cancel_when_timelock_not_expired_errors() {
assert!(is_error); assert!(is_error);
let (bob_swap, bob_join_handle) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; let (bob_swap, bob_join_handle) = ctx
.stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id)
.await;
assert!(matches!(bob_swap.state, BobState::BtcLocked { .. })); assert!(matches!(bob_swap.state, BobState::BtcLocked { .. }));
// Bob forces a refund that will fail // Bob forces a refund that will fail
@ -48,7 +53,9 @@ async fn given_bob_manually_forces_cancel_when_timelock_not_expired_errors() {
.is_err(); .is_err();
assert!(is_error); assert!(is_error);
let (bob_swap, _) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; let (bob_swap, _) = ctx
.stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id)
.await;
assert!(matches!(bob_swap.state, BobState::BtcLocked { .. })); assert!(matches!(bob_swap.state, BobState::BtcLocked { .. }));
Ok(()) Ok(())

View file

@ -0,0 +1,60 @@
pub mod harness;
use harness::bob_run_until::is_xmr_locked;
use harness::SlowCancelConfig;
use swap::protocol::alice::AliceState;
use swap::protocol::bob::BobState;
use swap::protocol::{alice, bob};
#[tokio::test]
async fn concurrent_bobs_after_xmr_lock_proof_sent() {
harness::setup_test(SlowCancelConfig, |mut ctx| async move {
let (bob_swap_1, bob_join_handle_1) = ctx.bob_swap().await;
let swap_id = bob_swap_1.swap_id;
let bob_swap_1 = tokio::spawn(bob::run_until(bob_swap_1, is_xmr_locked));
let alice_swap_1 = ctx.alice_next_swap().await;
let alice_swap_1 = tokio::spawn(alice::run(alice_swap_1));
let bob_state_1 = bob_swap_1.await??;
assert!(matches!(bob_state_1, BobState::XmrLocked { .. }));
// make sure bob_swap_1's event loop is gone
bob_join_handle_1.abort();
let (bob_swap_2, bob_join_handle_2) = ctx.bob_swap().await;
let bob_swap_2 = tokio::spawn(bob::run(bob_swap_2));
let alice_swap_2 = ctx.alice_next_swap().await;
let alice_swap_2 = tokio::spawn(alice::run(alice_swap_2));
// The 2nd swap should ALWAYS finish successfully in this
// scenario
let bob_state_2 = bob_swap_2.await??;
assert!(matches!(bob_state_2, BobState::XmrRedeemed { .. }));
let alice_state_2 = alice_swap_2.await??;
assert!(matches!(alice_state_2, AliceState::BtcRedeemed { .. }));
let (bob_swap_1, _) = ctx
.stop_and_resume_bob_from_db(bob_join_handle_2, swap_id)
.await;
assert!(matches!(bob_swap_1.state, BobState::XmrLocked { .. }));
// The 1st (paused) swap ALWAYS finishes successfully in this
// scenario, because it is ensured that Bob already received the
// transfer proof.
let bob_state_1 = bob::run(bob_swap_1).await?;
assert!(matches!(bob_state_1, BobState::XmrRedeemed { .. }));
let alice_state_1 = alice_swap_1.await??;
assert!(matches!(alice_state_1, AliceState::BtcRedeemed { .. }));
Ok(())
})
.await;
}

View file

@ -0,0 +1,61 @@
pub mod harness;
use harness::bob_run_until::is_btc_locked;
use harness::SlowCancelConfig;
use swap::protocol::alice::AliceState;
use swap::protocol::bob::BobState;
use swap::protocol::{alice, bob};
#[tokio::test]
async fn concurrent_bobs_before_xmr_lock_proof_sent() {
harness::setup_test(SlowCancelConfig, |mut ctx| async move {
let (bob_swap_1, bob_join_handle_1) = ctx.bob_swap().await;
let swap_id = bob_swap_1.swap_id;
let bob_swap_1 = tokio::spawn(bob::run_until(bob_swap_1, is_btc_locked));
let alice_swap_1 = ctx.alice_next_swap().await;
let alice_swap_1 = tokio::spawn(alice::run(alice_swap_1));
let bob_state_1 = bob_swap_1.await??;
assert!(matches!(bob_state_1, BobState::BtcLocked(_)));
// make sure bob_swap_1's event loop is gone
bob_join_handle_1.abort();
let (bob_swap_2, bob_join_handle_2) = ctx.bob_swap().await;
let bob_swap_2 = tokio::spawn(bob::run(bob_swap_2));
let alice_swap_2 = ctx.alice_next_swap().await;
let alice_swap_2 = tokio::spawn(alice::run(alice_swap_2));
// The 2nd swap ALWAYS finish successfully in this
// scenario, but will receive an "unwanted" transfer proof that is ignored in
// the event loop.
let bob_state_2 = bob_swap_2.await??;
assert!(matches!(bob_state_2, BobState::XmrRedeemed { .. }));
let alice_state_2 = alice_swap_2.await??;
assert!(matches!(alice_state_2, AliceState::BtcRedeemed { .. }));
let (bob_swap_1, _) = ctx
.stop_and_resume_bob_from_db(bob_join_handle_2, swap_id)
.await;
assert!(matches!(bob_state_1, BobState::BtcLocked(_)));
// The 1st (paused) swap is expected to refund, because the transfer
// proof is delivered to the wrong swap, and we currently don't store it in the
// database for the other swap.
let bob_state_1 = bob::run(bob_swap_1).await?;
assert!(matches!(bob_state_1, BobState::BtcRefunded { .. }));
let alice_state_1 = alice_swap_1.await??;
assert!(matches!(alice_state_1, AliceState::XmrRefunded { .. }));
Ok(())
})
.await;
}

View file

@ -0,0 +1,21 @@
pub mod harness;
use harness::SlowCancelConfig;
use swap::protocol::bob;
#[tokio::test]
async fn ensure_same_swap_id_for_alice_and_bob() {
harness::setup_test(SlowCancelConfig, |mut ctx| async move {
let (bob_swap, _) = ctx.bob_swap().await;
let bob_swap_id = bob_swap.swap_id;
let _ = tokio::spawn(bob::run(bob_swap));
// once Bob's swap is spawned we can retrieve Alice's swap and assert on the
// swap ID
let alice_swap = ctx.alice_next_swap().await;
assert_eq!(alice_swap.swap_id, bob_swap_id);
Ok(())
})
.await;
}

View file

@ -9,6 +9,7 @@ use swap::protocol::{alice, bob};
async fn given_bob_restarts_after_xmr_is_locked_resume_swap() { async fn given_bob_restarts_after_xmr_is_locked_resume_swap() {
harness::setup_test(SlowCancelConfig, |mut ctx| async move { harness::setup_test(SlowCancelConfig, |mut ctx| async move {
let (bob_swap, bob_join_handle) = ctx.bob_swap().await; let (bob_swap, bob_join_handle) = ctx.bob_swap().await;
let bob_swap_id = bob_swap.swap_id;
let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_xmr_locked)); let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_xmr_locked));
let alice_swap = ctx.alice_next_swap().await; let alice_swap = ctx.alice_next_swap().await;
@ -18,7 +19,9 @@ async fn given_bob_restarts_after_xmr_is_locked_resume_swap() {
assert!(matches!(bob_state, BobState::XmrLocked { .. })); assert!(matches!(bob_state, BobState::XmrLocked { .. }));
let (bob_swap, _) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; let (bob_swap, _) = ctx
.stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id)
.await;
assert!(matches!(bob_swap.state, BobState::XmrLocked { .. })); assert!(matches!(bob_swap.state, BobState::XmrLocked { .. }));
let bob_state = bob::run(bob_swap).await?; let bob_state = bob::run(bob_swap).await?;

View file

@ -9,6 +9,7 @@ use swap::protocol::{alice, bob};
async fn given_bob_restarts_after_xmr_is_locked_resume_swap() { async fn given_bob_restarts_after_xmr_is_locked_resume_swap() {
harness::setup_test(SlowCancelConfig, |mut ctx| async move { harness::setup_test(SlowCancelConfig, |mut ctx| async move {
let (bob_swap, bob_join_handle) = ctx.bob_swap().await; let (bob_swap, bob_join_handle) = ctx.bob_swap().await;
let bob_swap_id = bob_swap.swap_id;
let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_xmr_locked)); let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_xmr_locked));
let alice_swap = ctx.alice_next_swap().await; let alice_swap = ctx.alice_next_swap().await;
@ -18,7 +19,9 @@ async fn given_bob_restarts_after_xmr_is_locked_resume_swap() {
assert!(matches!(bob_state, BobState::XmrLocked { .. })); assert!(matches!(bob_state, BobState::XmrLocked { .. }));
let (bob_swap, _) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; let (bob_swap, _) = ctx
.stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id)
.await;
assert!(matches!(bob_swap.state, BobState::XmrLocked { .. })); assert!(matches!(bob_swap.state, BobState::XmrLocked { .. }));
let bob_state = bob::run(bob_swap).await?; let bob_state = bob::run(bob_swap).await?;

View file

@ -43,13 +43,52 @@ const BITCOIN_TEST_WALLET_NAME: &str = "testwallet";
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct StartingBalances { pub struct StartingBalances {
pub xmr: monero::Amount, pub xmr: monero::Amount,
pub xmr_outputs: Vec<monero::Amount>,
pub btc: bitcoin::Amount, pub btc: bitcoin::Amount,
} }
impl StartingBalances {
/// If monero_outputs is specified the monero balance will be:
/// monero_outputs * new_xmr = self_xmr
pub fn new(btc: bitcoin::Amount, xmr: monero::Amount, monero_outputs: Option<u64>) -> Self {
match monero_outputs {
None => {
if xmr == monero::Amount::ZERO {
return Self {
xmr,
xmr_outputs: vec![],
btc,
};
}
Self {
xmr,
xmr_outputs: vec![xmr],
btc,
}
}
Some(outputs) => {
let mut xmr_outputs = Vec::new();
let mut sum_xmr = monero::Amount::ZERO;
for _ in 0..outputs {
xmr_outputs.push(xmr);
sum_xmr = sum_xmr + xmr;
}
Self {
xmr: sum_xmr,
xmr_outputs,
btc,
}
}
}
}
}
struct BobParams { struct BobParams {
seed: Seed, seed: Seed,
db_path: PathBuf, db_path: PathBuf,
swap_id: Uuid,
bitcoin_wallet: Arc<bitcoin::Wallet>, bitcoin_wallet: Arc<bitcoin::Wallet>,
monero_wallet: Arc<monero::Wallet>, monero_wallet: Arc<monero::Wallet>,
alice_address: Multiaddr, alice_address: Multiaddr,
@ -58,12 +97,16 @@ struct BobParams {
} }
impl BobParams { impl BobParams {
pub async fn builder(&self, event_loop_handle: bob::EventLoopHandle) -> Result<bob::Builder> { pub async fn builder(
&self,
event_loop_handle: bob::EventLoopHandle,
swap_id: Uuid,
) -> Result<bob::Builder> {
let receive_address = self.monero_wallet.get_main_address(); let receive_address = self.monero_wallet.get_main_address();
Ok(bob::Builder::new( Ok(bob::Builder::new(
Database::open(&self.db_path.clone().as_path()).unwrap(), Database::open(&self.db_path.clone().as_path()).unwrap(),
self.swap_id, swap_id,
self.bitcoin_wallet.clone(), self.bitcoin_wallet.clone(),
self.monero_wallet.clone(), self.monero_wallet.clone(),
self.env_config, self.env_config,
@ -72,11 +115,16 @@ impl BobParams {
)) ))
} }
pub fn new_eventloop(&self) -> Result<(bob::EventLoop, bob::EventLoopHandle)> { pub fn new_eventloop(&self, swap_id: Uuid) -> Result<(bob::EventLoop, bob::EventLoopHandle)> {
let mut swarm = swarm::new::<bob::Behaviour>(&self.seed)?; let mut swarm = swarm::new::<bob::Behaviour>(&self.seed)?;
swarm.add_address(self.alice_peer_id, self.alice_address.clone()); swarm.add_address(self.alice_peer_id, self.alice_address.clone());
bob::EventLoop::new(swarm, self.alice_peer_id, self.bitcoin_wallet.clone()) bob::EventLoop::new(
swap_id,
swarm,
self.alice_peer_id,
self.bitcoin_wallet.clone(),
)
} }
} }
@ -139,24 +187,28 @@ impl TestContext {
} }
pub async fn alice_next_swap(&mut self) -> alice::Swap { pub async fn alice_next_swap(&mut self) -> alice::Swap {
timeout(Duration::from_secs(10), self.alice_swap_handle.recv()) timeout(Duration::from_secs(20), self.alice_swap_handle.recv())
.await .await
.expect("No Alice swap within 10 seconds, aborting because this test is waiting for a swap forever...") .expect("No Alice swap within 20 seconds, aborting because this test is likely waiting for a swap forever...")
.unwrap() .unwrap()
} }
pub async fn bob_swap(&mut self) -> (bob::Swap, BobApplicationHandle) { pub async fn bob_swap(&mut self) -> (bob::Swap, BobApplicationHandle) {
let (event_loop, event_loop_handle) = self.bob_params.new_eventloop().unwrap(); let swap_id = Uuid::new_v4();
let (event_loop, event_loop_handle) = self.bob_params.new_eventloop(swap_id).unwrap();
let swap = self let swap = self
.bob_params .bob_params
.builder(event_loop_handle) .builder(event_loop_handle, swap_id)
.await .await
.unwrap() .unwrap()
.with_init_params(self.btc_amount) .with_init_params(self.btc_amount)
.build() .build()
.unwrap(); .unwrap();
// ensure the wallet is up to date for concurrent swap tests
swap.bitcoin_wallet.sync().await.unwrap();
let join_handle = tokio::spawn(event_loop.run()); let join_handle = tokio::spawn(event_loop.run());
(swap, BobApplicationHandle(join_handle)) (swap, BobApplicationHandle(join_handle))
@ -165,14 +217,15 @@ impl TestContext {
pub async fn stop_and_resume_bob_from_db( pub async fn stop_and_resume_bob_from_db(
&mut self, &mut self,
join_handle: BobApplicationHandle, join_handle: BobApplicationHandle,
swap_id: Uuid,
) -> (bob::Swap, BobApplicationHandle) { ) -> (bob::Swap, BobApplicationHandle) {
join_handle.abort(); join_handle.abort();
let (event_loop, event_loop_handle) = self.bob_params.new_eventloop().unwrap(); let (event_loop, event_loop_handle) = self.bob_params.new_eventloop(swap_id).unwrap();
let swap = self let swap = self
.bob_params .bob_params
.builder(event_loop_handle) .builder(event_loop_handle, swap_id)
.await .await
.unwrap() .unwrap()
.build() .build()
@ -482,21 +535,20 @@ where
let cli = Cli::default(); let cli = Cli::default();
let _guard = tracing_subscriber::fmt() let _guard = tracing_subscriber::fmt()
.with_env_filter("warn,swap=debug,monero_harness=debug,monero_rpc=info,bitcoin_harness=info,testcontainers=info") .with_env_filter("warn,swap=debug,monero_harness=debug,monero_rpc=debug,bitcoin_harness=info,testcontainers=info")
.with_test_writer() .with_test_writer()
.set_default(); .set_default();
let env_config = C::get_config(); let env_config = C::get_config();
let (monero, containers) = harness::init_containers(&cli).await; let (monero, containers) = harness::init_containers(&cli).await;
monero.init_miner().await.unwrap();
let btc_amount = bitcoin::Amount::from_sat(1_000_000); let btc_amount = bitcoin::Amount::from_sat(1_000_000);
let xmr_amount = monero::Amount::from_monero(btc_amount.as_btc() / FixedRate::RATE).unwrap(); let xmr_amount = monero::Amount::from_monero(btc_amount.as_btc() / FixedRate::RATE).unwrap();
let alice_starting_balances = StartingBalances { let alice_starting_balances =
xmr: xmr_amount * 10, StartingBalances::new(bitcoin::Amount::ZERO, xmr_amount, Some(10));
btc: bitcoin::Amount::ZERO,
};
let electrs_rpc_port = containers let electrs_rpc_port = containers
.electrs .electrs
@ -532,10 +584,7 @@ where
); );
let bob_seed = Seed::random().unwrap(); let bob_seed = Seed::random().unwrap();
let bob_starting_balances = StartingBalances { let bob_starting_balances = StartingBalances::new(btc_amount * 10, monero::Amount::ZERO, None);
xmr: monero::Amount::ZERO,
btc: btc_amount * 10,
};
let (bob_bitcoin_wallet, bob_monero_wallet) = init_test_wallets( let (bob_bitcoin_wallet, bob_monero_wallet) = init_test_wallets(
MONERO_WALLET_NAME_BOB, MONERO_WALLET_NAME_BOB,
@ -552,7 +601,6 @@ where
let bob_params = BobParams { let bob_params = BobParams {
seed: Seed::random().unwrap(), seed: Seed::random().unwrap(),
db_path: tempdir().unwrap().path().to_path_buf(), db_path: tempdir().unwrap().path().to_path_buf(),
swap_id: Uuid::new_v4(),
bitcoin_wallet: bob_bitcoin_wallet.clone(), bitcoin_wallet: bob_bitcoin_wallet.clone(),
monero_wallet: bob_monero_wallet.clone(), monero_wallet: bob_monero_wallet.clone(),
alice_address: alice_listen_address.clone(), alice_address: alice_listen_address.clone(),
@ -560,6 +608,8 @@ where
env_config, env_config,
}; };
monero.start_miner().await.unwrap();
let test = TestContext { let test = TestContext {
env_config, env_config,
btc_amount, btc_amount,
@ -774,7 +824,14 @@ async fn init_test_wallets(
env_config: Config, env_config: Config,
) -> (Arc<bitcoin::Wallet>, Arc<monero::Wallet>) { ) -> (Arc<bitcoin::Wallet>, Arc<monero::Wallet>) {
monero monero
.init(vec![(name, starting_balances.xmr.as_piconero())]) .init_wallet(
name,
starting_balances
.xmr_outputs
.into_iter()
.map(|amount| amount.as_piconero())
.collect(),
)
.await .await
.unwrap(); .unwrap();

View file

@ -11,6 +11,7 @@ use swap::protocol::{alice, bob};
async fn alice_punishes_if_bob_never_acts_after_fund() { async fn alice_punishes_if_bob_never_acts_after_fund() {
harness::setup_test(FastPunishConfig, |mut ctx| async move { harness::setup_test(FastPunishConfig, |mut ctx| async move {
let (bob_swap, bob_join_handle) = ctx.bob_swap().await; let (bob_swap, bob_join_handle) = ctx.bob_swap().await;
let bob_swap_id = bob_swap.swap_id;
let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked)); let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked));
let alice_swap = ctx.alice_next_swap().await; let alice_swap = ctx.alice_next_swap().await;
@ -24,7 +25,9 @@ async fn alice_punishes_if_bob_never_acts_after_fund() {
// Restart Bob after Alice punished to ensure Bob transitions to // Restart Bob after Alice punished to ensure Bob transitions to
// punished and does not run indefinitely // punished and does not run indefinitely
let (bob_swap, _) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; let (bob_swap, _) = ctx
.stop_and_resume_bob_from_db(bob_join_handle, bob_swap_id)
.await;
assert!(matches!(bob_swap.state, BobState::BtcLocked { .. })); assert!(matches!(bob_swap.state, BobState::BtcLocked { .. }));
let bob_state = bob::run(bob_swap).await?; let bob_state = bob::run(bob_swap).await?;