Merge pull request #100 from comit-network/Bob-restart

Merging now, will tackle comments in follow-up.
This commit is contained in:
Franck Royer 2020-12-21 14:09:08 +11:00 committed by GitHub
commit 7d3b2faedd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 460 additions and 147 deletions

View File

@ -105,7 +105,7 @@ jobs:
run: cargo +nightly test --workspace --all-features -- -Z unstable-options --report-time
env:
MONERO_ADDITIONAL_SLEEP_PERIOD: 60000
RUST_MIN_STACK: 10000000
RUST_MIN_STACK: 16777216 # 16 MB. Default is 8MB. This is fine as in tests we start 2 programs: Alice and Bob.
- name: Build binary
run: |

7
Cargo.lock generated
View File

@ -1151,6 +1151,12 @@ dependencies = [
"version_check",
]
[[package]]
name = "get-port"
version = "3.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac6c41a39c60ae1fc5bf0e220347ce90fa1e4bb0fcdac65b09bb5f4576bebc84"
[[package]]
name = "get_if_addrs"
version = "0.5.2"
@ -3312,6 +3318,7 @@ dependencies = [
"ecdsa_fun",
"futures",
"genawaiter",
"get-port",
"hyper",
"libp2p",
"libp2p-tokio-socks5",

View File

@ -48,6 +48,7 @@ void = "1"
xmr-btc = { path = "../xmr-btc" }
[dev-dependencies]
get-port = "3"
hyper = "0.13"
port_check = "0.1"
spectral = "0.6"

View File

@ -242,4 +242,8 @@ impl EventLoop {
}
}
}
pub fn peer_id(&self) -> PeerId {
self.swarm.peer_id()
}
}

View File

@ -13,7 +13,7 @@
#![forbid(unsafe_code)]
use anyhow::{Context, Result};
use libp2p::core::Multiaddr;
use libp2p::{core::Multiaddr, PeerId};
use prettytable::{row, Table};
use rand::rngs::OsRng;
use std::{convert::TryFrom, sync::Arc};
@ -111,6 +111,7 @@ async fn main() -> Result<()> {
.await?;
}
Command::BuyXmr {
alice_peer_id,
alice_addr,
bitcoind_url,
bitcoin_wallet_name,
@ -144,7 +145,7 @@ async fn main() -> Result<()> {
let bob_state = BobState::Started {
state0,
amounts,
addr: alice_addr,
alice_peer_id: alice_peer_id.clone(),
};
let swap_id = Uuid::new_v4();
@ -153,7 +154,16 @@ async fn main() -> Result<()> {
send_bitcoin, receive_monero, swap_id
);
bob_swap(swap_id, bob_state, bitcoin_wallet, monero_wallet, db).await?;
bob_swap(
swap_id,
bob_state,
bitcoin_wallet,
monero_wallet,
db,
alice_peer_id,
alice_addr,
)
.await?;
}
Command::History => {
let mut table = Table::new();
@ -173,6 +183,8 @@ async fn main() -> Result<()> {
bitcoin_wallet_name,
monero_wallet_rpc_url,
listen_addr,
alice_peer_id,
alice_addr,
} => {
let db_swap = db.get_state(swap_id)?;
@ -202,7 +214,16 @@ async fn main() -> Result<()> {
config,
)
.await?;
bob_swap(swap_id, bob_state, bitcoin_wallet, monero_wallet, db).await?;
bob_swap(
swap_id,
bob_state,
bitcoin_wallet,
monero_wallet,
db,
alice_peer_id,
alice_addr,
)
.await?;
} else {
anyhow::bail!("Unable to construct swap state for swap with id {}")
}
@ -277,6 +298,8 @@ async fn bob_swap(
bitcoin_wallet: Arc<bitcoin::Wallet>,
monero_wallet: Arc<monero::Wallet>,
db: Database,
alice_peer_id: PeerId,
alice_addr: Multiaddr,
) -> Result<BobState> {
let bob_behaviour = bob::Behaviour::default();
let bob_transport = build(bob_behaviour.identity())?;
@ -291,6 +314,8 @@ async fn bob_swap(
monero_wallet.clone(),
OsRng,
swap_id,
alice_peer_id,
alice_addr,
);
tokio::spawn(async move { event_loop.run().await });

View File

@ -10,7 +10,10 @@ use crate::{
SwapAmounts,
};
use anyhow::Result;
use libp2p::{core::identity::Keypair, NetworkBehaviour, PeerId};
use libp2p::{
core::{identity::Keypair, Multiaddr},
NetworkBehaviour, PeerId,
};
use tracing::{debug, info};
use xmr_btc::{
alice,
@ -159,9 +162,9 @@ impl Behaviour {
debug!("Sent Message3");
}
/// Returns Alice's peer id if we are connected.
pub fn peer_id_of_alice(&self) -> Option<PeerId> {
self.pt.counterparty_peer_id()
/// Add a known address for the given peer
pub fn add_address(&mut self, peer_id: PeerId, address: Multiaddr) {
self.pt.add_address(peer_id, address)
}
}

View File

@ -9,7 +9,7 @@ use tokio::{
stream::StreamExt,
sync::mpsc::{Receiver, Sender},
};
use tracing::info;
use tracing::{debug, error, info};
use xmr_btc::{alice, bitcoin::EncryptedSignature, bob};
pub struct Channels<T> {
@ -36,7 +36,8 @@ pub struct EventLoopHandle {
msg2: Receiver<alice::Message2>,
request_amounts: Sender<(PeerId, ::bitcoin::Amount)>,
conn_established: Receiver<PeerId>,
dial_alice: Sender<Multiaddr>,
dial_alice: Sender<PeerId>,
add_address: Sender<(PeerId, Multiaddr)>,
send_msg0: Sender<(PeerId, bob::Message0)>,
send_msg1: Sender<(PeerId, bob::Message1)>,
send_msg2: Sender<(PeerId, bob::Message2)>,
@ -44,13 +45,6 @@ pub struct EventLoopHandle {
}
impl EventLoopHandle {
pub async fn recv_conn_established(&mut self) -> Result<PeerId> {
self.conn_established
.recv()
.await
.ok_or_else(|| anyhow!("Failed to receive connection established from Bob"))
}
pub async fn recv_message0(&mut self) -> Result<alice::Message0> {
self.msg0
.recv()
@ -72,9 +66,23 @@ impl EventLoopHandle {
.ok_or_else(|| anyhow!("Failed o receive message 2 from Bob"))
}
pub async fn dial_alice(&mut self, addr: Multiaddr) -> Result<()> {
info!("sending msg to ourselves to dial alice: {}", addr);
let _ = self.dial_alice.send(addr).await?;
/// Dials other party and wait for the connection to be established.
/// Do nothing if we are already connected
pub async fn dial(&mut self, peer_id: PeerId) -> Result<()> {
debug!("Attempt to dial Alice {}", peer_id);
let _ = self.dial_alice.send(peer_id).await?;
self.conn_established
.recv()
.await
.ok_or_else(|| anyhow!("Failed to receive connection established from Alice"))?;
Ok(())
}
pub async fn add_address(&mut self, peer_id: PeerId, addr: Multiaddr) -> Result<()> {
debug!("Attempt to add address {} for peer id {}", addr, peer_id);
self.add_address.send((peer_id, addr)).await?;
Ok(())
}
@ -119,7 +127,8 @@ pub struct EventLoop {
msg2: Sender<alice::Message2>,
conn_established: Sender<PeerId>,
request_amounts: Receiver<(PeerId, ::bitcoin::Amount)>,
dial_alice: Receiver<Multiaddr>,
dial_alice: Receiver<PeerId>,
add_address: Receiver<(PeerId, Multiaddr)>,
send_msg0: Receiver<(PeerId, bob::Message0)>,
send_msg1: Receiver<(PeerId, bob::Message1)>,
send_msg2: Receiver<(PeerId, bob::Message2)>,
@ -142,6 +151,7 @@ impl EventLoop {
let msg2 = Channels::new();
let conn_established = Channels::new();
let dial_alice = Channels::new();
let add_address = Channels::new();
let send_msg0 = Channels::new();
let send_msg1 = Channels::new();
let send_msg2 = Channels::new();
@ -155,6 +165,7 @@ impl EventLoop {
msg2: msg2.sender,
conn_established: conn_established.sender,
dial_alice: dial_alice.receiver,
add_address: add_address.receiver,
send_msg0: send_msg0.receiver,
send_msg1: send_msg1.receiver,
send_msg2: send_msg2.receiver,
@ -168,6 +179,7 @@ impl EventLoop {
msg2: msg2.receiver,
conn_established: conn_established.receiver,
dial_alice: dial_alice.sender,
add_address: add_address.sender,
send_msg0: send_msg0.sender,
send_msg1: send_msg1.sender,
send_msg2: send_msg2.sender,
@ -182,8 +194,8 @@ impl EventLoop {
tokio::select! {
swarm_event = self.swarm.next().fuse() => {
match swarm_event {
OutEvent::ConnectionEstablished(alice) => {
let _ = self.conn_established.send(alice).await;
OutEvent::ConnectionEstablished(peer_id) => {
let _ = self.conn_established.send(peer_id).await;
}
OutEvent::Amounts(_amounts) => info!("Amounts received from Alice"),
OutEvent::Message0(msg) => {
@ -198,10 +210,25 @@ impl EventLoop {
OutEvent::Message3 => info!("Alice acknowledged message 3 received"),
}
},
addr = self.dial_alice.next().fuse() => {
if let Some(addr) = addr {
info!("dialing alice: {}", addr);
libp2p::Swarm::dial_addr(&mut self.swarm, addr).expect("Could not dial alice");
peer_id_addr = self.add_address.next().fuse() => {
if let Some((peer_id, addr)) = peer_id_addr {
debug!("Add address for {}: {}", peer_id, addr);
self.swarm.add_address(peer_id, addr);
}
},
peer_id = self.dial_alice.next().fuse() => {
if let Some(peer_id) = peer_id {
if self.swarm.pt.is_connected(&peer_id) {
debug!("Already connected to Alice: {}", peer_id);
let _ = self.conn_established.send(peer_id).await;
} else {
info!("dialing alice: {}", peer_id);
if let Err(err) = libp2p::Swarm::dial(&mut self.swarm, &peer_id) {
error!("Could not dial alice: {}", err);
// TODO(Franck): If Dial fails then we should report it.
}
}
}
},
amounts = self.request_amounts.next().fuse() => {

View File

@ -23,12 +23,12 @@ pub enum BobState {
Started {
state0: bob::State0,
amounts: SwapAmounts,
addr: Multiaddr,
alice_peer_id: PeerId,
},
Negotiated(bob::State2, PeerId),
BtcLocked(bob::State3, PeerId),
XmrLocked(bob::State4, PeerId),
EncSigSent(bob::State4, PeerId),
EncSigSent(bob::State4),
BtcRedeemed(bob::State5),
T1Expired(bob::State4),
Cancelled(bob::State4),
@ -67,7 +67,7 @@ impl From<BobState> for state::Bob {
BobState::Negotiated(state2, peer_id) => Bob::Negotiated { state2, peer_id },
BobState::BtcLocked(state3, peer_id) => Bob::BtcLocked { state3, peer_id },
BobState::XmrLocked(state4, peer_id) => Bob::XmrLocked { state4, peer_id },
BobState::EncSigSent(state4, peer_id) => Bob::EncSigSent { state4, peer_id },
BobState::EncSigSent(state4) => Bob::EncSigSent { state4 },
BobState::BtcRedeemed(state5) => Bob::BtcRedeemed(state5),
BobState::T1Expired(state4) => Bob::T1Expired(state4),
BobState::Cancelled(state4) => Bob::BtcCancelled(state4),
@ -88,7 +88,7 @@ impl TryFrom<state::Swap> for BobState {
Bob::Negotiated { state2, peer_id } => BobState::Negotiated(state2, peer_id),
Bob::BtcLocked { state3, peer_id } => BobState::BtcLocked(state3, peer_id),
Bob::XmrLocked { state4, peer_id } => BobState::XmrLocked(state4, peer_id),
Bob::EncSigSent { state4, peer_id } => BobState::EncSigSent(state4, peer_id),
Bob::EncSigSent { state4 } => BobState::EncSigSent(state4),
Bob::BtcRedeemed(state5) => BobState::BtcRedeemed(state5),
Bob::T1Expired(state4) => BobState::T1Expired(state4),
Bob::BtcCancelled(state4) => BobState::Cancelled(state4),
@ -102,18 +102,26 @@ impl TryFrom<state::Swap> for BobState {
}
}
// TODO(Franck): Make this a method on a struct
#[allow(clippy::too_many_arguments)]
pub async fn swap<R>(
state: BobState,
event_loop_handle: EventLoopHandle,
mut event_loop_handle: EventLoopHandle,
db: Database,
bitcoin_wallet: Arc<crate::bitcoin::Wallet>,
monero_wallet: Arc<crate::monero::Wallet>,
rng: R,
swap_id: Uuid,
alice_peer_id: PeerId,
alice_addr: Multiaddr,
) -> Result<BobState>
where
R: RngCore + CryptoRng + Send,
{
event_loop_handle
.add_address(alice_peer_id, alice_addr)
.await?;
run_until(
state,
is_complete,
@ -173,13 +181,15 @@ where
BobState::Started {
state0,
amounts,
addr,
alice_peer_id,
} => {
event_loop_handle.dial(alice_peer_id.clone()).await?;
let (state2, alice_peer_id) = negotiate(
state0,
amounts,
&mut event_loop_handle,
addr,
alice_peer_id.clone(),
&mut rng,
bitcoin_wallet.clone(),
)
@ -202,6 +212,8 @@ where
.await
}
BobState::Negotiated(state2, alice_peer_id) => {
// Do not lock Bitcoin if not connected to Alice.
event_loop_handle.dial(alice_peer_id.clone()).await?;
// Alice and Bob have exchanged info
let state3 = state2.lock_btc(bitcoin_wallet.as_ref()).await?;
@ -224,7 +236,10 @@ where
// Bob has locked Btc
// Watch for Alice to Lock Xmr or for t1 to elapse
BobState::BtcLocked(state3, alice_peer_id) => {
// todo: watch until t1, not indefinetely
// TODO(Franck): Refund if cannot connect to Alice.
event_loop_handle.dial(alice_peer_id.clone()).await?;
// todo: watch until t1, not indefinitely
let msg2 = event_loop_handle.recv_message2().await?;
let state4 = state3
.watch_for_lock_xmr(monero_wallet.as_ref(), msg2)
@ -247,12 +262,16 @@ where
.await
}
BobState::XmrLocked(state, alice_peer_id) => {
// TODO(Franck): Refund if cannot connect to Alice.
event_loop_handle.dial(alice_peer_id.clone()).await?;
let state = if let Epoch::T0 = state.current_epoch(bitcoin_wallet.as_ref()).await? {
// Alice has locked Xmr
// Bob sends Alice his key
let tx_redeem_encsig = state.tx_redeem_encsig();
let state4_clone = state.clone();
// TODO(Franck): Refund if message cannot be sent.
let enc_sig_sent_watcher =
event_loop_handle.send_message3(alice_peer_id.clone(), tx_redeem_encsig);
let bitcoin_wallet = bitcoin_wallet.clone();
@ -260,7 +279,7 @@ where
select! {
_ = enc_sig_sent_watcher => {
BobState::EncSigSent(state, alice_peer_id)
BobState::EncSigSent(state)
},
_ = t1_timeout => {
BobState::T1Expired(state)
@ -284,7 +303,7 @@ where
)
.await
}
BobState::EncSigSent(state, ..) => {
BobState::EncSigSent(state) => {
let state = if let Epoch::T0 = state.current_epoch(bitcoin_wallet.as_ref()).await? {
let state_clone = state.clone();
let redeem_watcher = state_clone.watch_for_redeem_btc(bitcoin_wallet.as_ref());
@ -401,7 +420,7 @@ pub async fn negotiate<R>(
state0: xmr_btc::bob::State0,
amounts: SwapAmounts,
swarm: &mut EventLoopHandle,
addr: Multiaddr,
alice_peer_id: PeerId,
mut rng: R,
bitcoin_wallet: Arc<crate::bitcoin::Wallet>,
) -> Result<(State2, PeerId)>
@ -409,10 +428,6 @@ where
R: RngCore + CryptoRng + Send,
{
tracing::trace!("Starting negotiate");
swarm.dial_alice(addr).await?;
let alice_peer_id = swarm.recv_conn_established().await?;
swarm
.request_amounts(alice_peer_id.clone(), amounts.btc)
.await?;

View File

@ -1,4 +1,4 @@
use libp2p::core::Multiaddr;
use libp2p::{core::Multiaddr, PeerId};
use url::Url;
use uuid::Uuid;
@ -47,6 +47,9 @@ pub enum Command {
receive_bitcoin: bitcoin::Amount,
},
BuyXmr {
#[structopt(short = "p", long = "connect-peer-id")]
alice_peer_id: PeerId,
#[structopt(short = "a", long = "connect-addr")]
alice_addr: Multiaddr,
@ -78,6 +81,12 @@ pub enum Command {
#[structopt(short = "id", long = "swap-id")]
swap_id: Uuid,
#[structopt(short = "p", long = "connect-peer-id")]
alice_peer_id: PeerId,
#[structopt(short = "a", long = "connect-addr")]
alice_addr: Multiaddr,
#[structopt(
short = "b",
long = "bitcoind-rpc",

View File

@ -7,7 +7,10 @@ use libp2p::{
},
Multiaddr, PeerId,
};
use std::{collections::VecDeque, task::Poll};
use std::{
collections::{HashMap, VecDeque},
task::Poll,
};
#[derive(Debug)]
pub enum OutEvent {
@ -21,10 +24,21 @@ pub enum OutEvent {
#[derive(Default, Debug)]
pub struct PeerTracker {
connected: Option<(PeerId, Multiaddr)>,
address_of_peer: HashMap<PeerId, Multiaddr>,
events: VecDeque<OutEvent>,
}
impl PeerTracker {
/// Return whether we are connected to the given peer.
pub fn is_connected(&self, peer_id: &PeerId) -> bool {
if let Some((connected_peer_id, _)) = &self.connected {
if connected_peer_id == peer_id {
return true;
}
}
false
}
/// Returns the peer id of counterparty if we are connected.
pub fn counterparty_peer_id(&self) -> Option<PeerId> {
if let Some((id, _)) = &self.connected {
@ -33,13 +47,18 @@ impl PeerTracker {
None
}
/// Returns the multiaddr of counterparty if we are connected.
pub fn counterparty_addr(&self) -> Option<Multiaddr> {
if let Some((_, addr)) = &self.connected {
return Some(addr.clone());
/// Returns the peer_id and multiaddr of counterparty if we are connected.
pub fn counterparty(&self) -> Option<(PeerId, Multiaddr)> {
if let Some((peer_id, addr)) = &self.connected {
return Some((peer_id.clone(), addr.clone()));
}
None
}
/// Add an address for a given peer. We only store one address per peer.
pub fn add_address(&mut self, peer_id: PeerId, address: Multiaddr) {
self.address_of_peer.insert(peer_id, address);
}
}
impl NetworkBehaviour for PeerTracker {
@ -50,11 +69,17 @@ impl NetworkBehaviour for PeerTracker {
DummyProtocolsHandler::default()
}
fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> {
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
let mut addresses: Vec<Multiaddr> = vec![];
if let Some(addr) = self.counterparty_addr() {
addresses.push(addr)
if let Some((counterparty_peer_id, addr)) = self.counterparty() {
if counterparty_peer_id == *peer_id {
addresses.push(addr)
}
}
if let Some(addr) = self.address_of_peer.get(peer_id) {
addresses.push(addr.clone());
}
addresses

View File

@ -56,8 +56,6 @@ pub enum Bob {
},
EncSigSent {
state4: bob::State4,
#[serde(with = "crate::serde::peer_id")]
peer_id: PeerId,
},
BtcRedeemed(bob::State5),
T1Expired(bob::State4),

View File

@ -1,5 +1,9 @@
use crate::testutils::{init_alice, init_bob};
use futures::future::try_join;
use futures::{
future::{join, select},
FutureExt,
};
use get_port::get_port;
use libp2p::Multiaddr;
use rand::rngs::OsRng;
use swap::{alice, bob};
@ -35,8 +39,8 @@ async fn happy_path() {
let xmr_alice = xmr_to_swap * 10;
let xmr_bob = xmr_btc::monero::Amount::ZERO;
// todo: This should not be hardcoded
let alice_multiaddr: Multiaddr = "/ip4/127.0.0.1/tcp/9876"
let port = get_port().expect("Failed to find a free port");
let alice_multiaddr: Multiaddr = format!("/ip4/127.0.0.1/tcp/{}", port)
.parse()
.expect("failed to parse Alice's address");
@ -62,7 +66,8 @@ async fn happy_path() {
let (bob_state, bob_event_loop, bob_event_loop_handle, bob_btc_wallet, bob_xmr_wallet, bob_db) =
init_bob(
alice_multiaddr,
alice_multiaddr.clone(),
alice_event_loop.peer_id(),
&bitcoind,
&monero,
btc_to_swap,
@ -80,9 +85,11 @@ async fn happy_path() {
config,
Uuid::new_v4(),
alice_db,
);
)
.boxed();
let _alice_swarm_fut = tokio::spawn(async move { alice_event_loop.run().await });
let alice_peer_id = alice_event_loop.peer_id();
let alice_fut = select(alice_swap_fut, alice_event_loop.run().boxed());
let bob_swap_fut = bob::swap::swap(
bob_state,
@ -92,11 +99,14 @@ async fn happy_path() {
bob_xmr_wallet.clone(),
OsRng,
Uuid::new_v4(),
);
alice_peer_id,
alice_multiaddr,
)
.boxed();
let _bob_swarm_fut = tokio::spawn(async move { bob_event_loop.run().await });
let bob_fut = select(bob_swap_fut, bob_event_loop.run().boxed());
try_join(alice_swap_fut, bob_swap_fut).await.unwrap();
join(alice_fut, bob_fut).await;
let btc_alice_final = alice_btc_wallet.as_ref().balance().await.unwrap();
let btc_bob_final = bob_btc_wallet.as_ref().balance().await.unwrap();

View File

@ -1,17 +1,17 @@
use crate::testutils::{init_alice, init_bob};
use get_port::get_port;
use libp2p::Multiaddr;
use rand::rngs::OsRng;
use std::convert::TryFrom;
use swap::{alice, alice::swap::AliceState, bitcoin, bob, storage::Database};
use tempfile::tempdir;
use testcontainers::clients::Cli;
use testutils::init_tracing;
use uuid::Uuid;
use xmr_btc::config::Config;
pub mod testutils;
use crate::testutils::{init_alice, init_bob};
use std::convert::TryFrom;
use testutils::init_tracing;
#[tokio::test]
async fn given_alice_restarts_after_encsig_is_learned_resume_swap() {
let _guard = init_tracing();
@ -31,7 +31,8 @@ async fn given_alice_restarts_after_encsig_is_learned_resume_swap() {
let bob_btc_starting_balance = btc_to_swap * 10;
let alice_xmr_starting_balance = xmr_to_swap * 10;
let alice_multiaddr: Multiaddr = "/ip4/127.0.0.1/tcp/9877"
let port = get_port().expect("Failed to find a free port");
let alice_multiaddr: Multiaddr = format!("/ip4/127.0.0.1/tcp/{}", port)
.parse()
.expect("failed to parse Alice's address");
@ -55,9 +56,12 @@ async fn given_alice_restarts_after_encsig_is_learned_resume_swap() {
)
.await;
let alice_peer_id = alice_event_loop.peer_id();
let (bob_state, bob_event_loop, bob_event_loop_handle, bob_btc_wallet, bob_xmr_wallet, bob_db) =
init_bob(
alice_multiaddr.clone(),
alice_peer_id.clone(),
&bitcoind,
&monero,
btc_to_swap,
@ -72,25 +76,24 @@ async fn given_alice_restarts_after_encsig_is_learned_resume_swap() {
let bob_btc_wallet_clone = bob_btc_wallet.clone();
let bob_xmr_wallet_clone = bob_xmr_wallet.clone();
let _ = tokio::spawn(async move {
bob::swap::swap(
bob_state,
bob_event_loop_handle,
bob_db,
bob_btc_wallet.clone(),
bob_xmr_wallet.clone(),
OsRng,
Uuid::new_v4(),
)
.await
});
let _bob_swarm_fut = tokio::spawn(async move { bob_event_loop.run().await });
let bob_fut = bob::swap::swap(
bob_state,
bob_event_loop_handle,
bob_db,
bob_btc_wallet.clone(),
bob_xmr_wallet.clone(),
OsRng,
Uuid::new_v4(),
alice_peer_id,
alice_multiaddr.clone(),
);
let alice_db_datadir = tempdir().unwrap();
let alice_db = Database::open(alice_db_datadir.path()).unwrap();
let _alice_swarm_fut = tokio::spawn(async move { alice_event_loop.run().await });
tokio::spawn(async move { alice_event_loop.run().await });
tokio::spawn(bob_fut);
tokio::spawn(bob_event_loop.run());
let alice_swap_id = Uuid::new_v4();

View File

@ -1,18 +1,17 @@
use crate::testutils::{init_alice, init_bob};
use get_port::get_port;
use libp2p::Multiaddr;
use rand::rngs::OsRng;
use swap::{alice, bitcoin, bob, storage::Database};
use std::convert::TryFrom;
use swap::{alice, bitcoin, bob, bob::swap::BobState, storage::Database};
use tempfile::tempdir;
use testcontainers::clients::Cli;
use testutils::init_tracing;
use uuid::Uuid;
use xmr_btc::config::Config;
pub mod testutils;
use crate::testutils::{init_alice, init_bob};
use std::convert::TryFrom;
use swap::bob::swap::BobState;
use testutils::init_tracing;
#[tokio::test]
async fn given_bob_restarts_after_encsig_is_sent_resume_swap() {
let _guard = init_tracing();
@ -32,7 +31,8 @@ async fn given_bob_restarts_after_encsig_is_sent_resume_swap() {
let bob_btc_starting_balance = btc_to_swap * 10;
let alice_xmr_starting_balance = xmr_to_swap * 10;
let alice_multiaddr: Multiaddr = "/ip4/127.0.0.1/tcp/9877"
let port = get_port().expect("Failed to find a free port");
let alice_multiaddr: Multiaddr = format!("/ip4/127.0.0.1/tcp/{}", port)
.parse()
.expect("failed to parse Alice's address");
@ -56,9 +56,11 @@ async fn given_bob_restarts_after_encsig_is_sent_resume_swap() {
)
.await;
let alice_peer_id = alice_event_loop.peer_id();
let (bob_state, bob_event_loop, bob_event_loop_handle, bob_btc_wallet, bob_xmr_wallet, _) =
init_bob(
alice_multiaddr.clone(),
alice_peer_id.clone(),
&bitcoind,
&monero,
btc_to_swap,
@ -136,6 +138,8 @@ async fn given_bob_restarts_after_encsig_is_sent_resume_swap() {
bob_xmr_wallet,
OsRng,
bob_swap_id,
alice_peer_id,
alice_multiaddr,
)
.await
.unwrap();

View File

@ -0,0 +1,153 @@
use crate::testutils::{init_alice, init_bob};
use get_port::get_port;
use libp2p::Multiaddr;
use rand::rngs::OsRng;
use swap::{alice, alice::swap::AliceState, bitcoin, bob, bob::swap::BobState, storage::Database};
use tempfile::tempdir;
use testcontainers::clients::Cli;
use testutils::init_tracing;
use tokio::select;
use uuid::Uuid;
use xmr_btc::config::Config;
pub mod testutils;
#[tokio::test]
async fn given_bob_restarts_after_xmr_is_locked_resume_swap() {
let _guard = init_tracing();
let cli = Cli::default();
let (
monero,
testutils::Containers {
bitcoind,
monerods: _monerods,
},
) = testutils::init_containers(&cli).await;
let btc_to_swap = bitcoin::Amount::from_sat(1_000_000);
let xmr_to_swap = xmr_btc::monero::Amount::from_piconero(1_000_000_000_000);
let bob_btc_starting_balance = btc_to_swap * 10;
let bob_xmr_starting_balance = xmr_btc::monero::Amount::from_piconero(0);
let alice_btc_starting_balance = bitcoin::Amount::ZERO;
let alice_xmr_starting_balance = xmr_to_swap * 10;
let port = get_port().expect("Failed to find a free port");
let alice_multiaddr: Multiaddr = format!("/ip4/127.0.0.1/tcp/{}", port)
.parse()
.expect("failed to parse Alice's address");
let (
alice_state,
mut alice_event_loop,
alice_event_loop_handle,
alice_btc_wallet,
alice_xmr_wallet,
alice_db,
) = init_alice(
&bitcoind,
&monero,
btc_to_swap,
xmr_to_swap,
alice_xmr_starting_balance,
alice_multiaddr.clone(),
Config::regtest(),
)
.await;
let alice_peer_id = alice_event_loop.peer_id();
let (bob_state, bob_event_loop_1, bob_event_loop_handle_1, bob_btc_wallet, bob_xmr_wallet, _) =
init_bob(
alice_multiaddr.clone(),
alice_peer_id.clone(),
&bitcoind,
&monero,
btc_to_swap,
bob_btc_starting_balance,
xmr_to_swap,
Config::regtest(),
)
.await;
let alice_fut = alice::swap::swap(
alice_state,
alice_event_loop_handle,
alice_btc_wallet.clone(),
alice_xmr_wallet.clone(),
Config::regtest(),
Uuid::new_v4(),
alice_db,
);
let bob_swap_id = Uuid::new_v4();
let bob_db_datadir = tempdir().unwrap();
let bob_xmr_locked_fut = {
let bob_db = Database::open(bob_db_datadir.path()).unwrap();
bob::swap::run_until(
bob_state,
bob::swap::is_xmr_locked,
bob_event_loop_handle_1,
bob_db,
bob_btc_wallet.clone(),
bob_xmr_wallet.clone(),
OsRng,
bob_swap_id,
)
};
tokio::spawn(async move { alice_event_loop.run().await });
let alice_fut_handle = tokio::spawn(alice_fut);
// We are selecting with bob_event_loop_1 so that we stop polling on it once
// bob reaches `xmr locked` state.
let bob_restart_state = select! {
res = bob_xmr_locked_fut => res.unwrap(),
_ = bob_event_loop_1.run() => panic!("The event loop should never finish")
};
let (bob_event_loop_2, bob_event_loop_handle_2) = testutils::init_bob_event_loop();
let bob_fut = bob::swap::swap(
bob_restart_state,
bob_event_loop_handle_2,
Database::open(bob_db_datadir.path()).unwrap(),
bob_btc_wallet.clone(),
bob_xmr_wallet.clone(),
OsRng,
bob_swap_id,
alice_peer_id,
alice_multiaddr,
);
let bob_final_state = select! {
bob_final_state = bob_fut => bob_final_state.unwrap(),
_ = bob_event_loop_2.run() => panic!("Event loop is not expected to stop")
};
assert!(matches!(bob_final_state, BobState::XmrRedeemed));
// Wait for Alice to finish too.
let alice_final_state = alice_fut_handle.await.unwrap().unwrap();
assert!(matches!(alice_final_state, AliceState::BtcRedeemed));
let btc_alice_final = alice_btc_wallet.as_ref().balance().await.unwrap();
let btc_bob_final = bob_btc_wallet.as_ref().balance().await.unwrap();
let xmr_alice_final = alice_xmr_wallet.as_ref().get_balance().await.unwrap();
bob_xmr_wallet.as_ref().0.refresh().await.unwrap();
let xmr_bob_final = bob_xmr_wallet.as_ref().get_balance().await.unwrap();
assert_eq!(
btc_alice_final,
alice_btc_starting_balance + btc_to_swap - bitcoin::Amount::from_sat(bitcoin::TX_FEE)
);
assert!(btc_bob_final <= bob_btc_starting_balance - btc_to_swap);
assert!(xmr_alice_final <= alice_xmr_starting_balance - xmr_to_swap);
assert_eq!(xmr_bob_final, bob_xmr_starting_balance + xmr_to_swap);
}

View File

@ -1,5 +1,9 @@
use crate::testutils::{init_alice, init_bob};
use futures::future::try_join;
use futures::{
future::{join, select, Either},
FutureExt,
};
use get_port::get_port;
use libp2p::Multiaddr;
use rand::rngs::OsRng;
use swap::{alice, alice::swap::AliceState, bob, bob::swap::BobState};
@ -33,8 +37,8 @@ async fn alice_punishes_if_bob_never_acts_after_fund() {
let alice_btc_starting_balance = bitcoin::Amount::ZERO;
let alice_xmr_starting_balance = xmr_to_swap * 10;
// todo: This should not be hardcoded
let alice_multiaddr: Multiaddr = "/ip4/127.0.0.1/tcp/9877"
let port = get_port().expect("Failed to find a free port");
let alice_multiaddr: Multiaddr = format!("/ip4/127.0.0.1/tcp/{}", port)
.parse()
.expect("failed to parse Alice's address");
@ -61,6 +65,7 @@ async fn alice_punishes_if_bob_never_acts_after_fund() {
let (bob_state, bob_event_loop, bob_event_loop_handle, bob_btc_wallet, bob_xmr_wallet, bob_db) =
init_bob(
alice_multiaddr,
alice_event_loop.peer_id(),
&bitcoind,
&monero,
btc_to_swap,
@ -79,9 +84,10 @@ async fn alice_punishes_if_bob_never_acts_after_fund() {
bob_xmr_wallet.clone(),
OsRng,
Uuid::new_v4(),
);
)
.boxed();
let _bob_swarm_fut = tokio::spawn(async move { bob_event_loop.run().await });
let bob_fut = select(bob_btc_locked_fut, bob_event_loop.run().boxed());
let alice_fut = alice::swap::swap(
alice_state,
@ -91,12 +97,23 @@ async fn alice_punishes_if_bob_never_acts_after_fund() {
Config::regtest(),
Uuid::new_v4(),
alice_db,
);
)
.boxed();
let _alice_swarm_fut = tokio::spawn(async move { alice_event_loop.run().await });
let alice_fut = select(alice_fut, alice_event_loop.run().boxed());
// Wait until alice has locked xmr and bob has locked btc
let (alice_state, bob_state) = try_join(alice_fut, bob_btc_locked_fut).await.unwrap();
let (alice_state, bob_state) = join(alice_fut, bob_fut).await;
let alice_state = match alice_state {
Either::Left((state, _)) => state.unwrap(),
Either::Right(_) => panic!("Alice event loop should not terminate."),
};
let bob_state = match bob_state {
Either::Left((state, _)) => state.unwrap(),
Either::Right(_) => panic!("Bob event loop should not terminate."),
};
assert!(matches!(alice_state, AliceState::Punished));
let bob_state3 = if let BobState::BtcLocked(state3, ..) = bob_state {

View File

@ -1,11 +1,13 @@
use crate::testutils::{init_alice, init_bob};
use futures::future::try_join;
use get_port::get_port;
use libp2p::Multiaddr;
use rand::rngs::OsRng;
use swap::{alice, alice::swap::AliceState, bob, bob::swap::BobState, storage::Database};
use tempfile::tempdir;
use testcontainers::clients::Cli;
use testutils::init_tracing;
use tokio::select;
use uuid::Uuid;
use xmr_btc::{bitcoin, config::Config};
@ -14,7 +16,7 @@ pub mod testutils;
// Bob locks btc and Alice locks xmr. Alice fails to act so Bob refunds. Alice
// then also refunds.
#[tokio::test]
async fn both_refund() {
async fn given_alice_restarts_after_xmr_is_locked_abort_swap() {
let _guard = init_tracing();
let cli = Cli::default();
@ -35,15 +37,15 @@ async fn both_refund() {
let alice_btc_starting_balance = bitcoin::Amount::ZERO;
let alice_xmr_starting_balance = xmr_to_swap * 10;
// todo: This should not be hardcoded
let alice_multiaddr: Multiaddr = "/ip4/127.0.0.1/tcp/9879"
let port = get_port().expect("Failed to find a free port");
let alice_multiaddr: Multiaddr = format!("/ip4/127.0.0.1/tcp/{}", port)
.parse()
.expect("failed to parse Alice's address");
let (
alice_state,
mut alice_event_loop,
alice_event_loop_handle,
mut alice_event_loop_1,
alice_event_loop_handle_1,
alice_btc_wallet,
alice_xmr_wallet,
_,
@ -61,6 +63,7 @@ async fn both_refund() {
let (bob_state, bob_event_loop, bob_event_loop_handle, bob_btc_wallet, bob_xmr_wallet, bob_db) =
init_bob(
alice_multiaddr.clone(),
alice_event_loop_1.peer_id(),
&bitcoind,
&monero,
btc_to_swap,
@ -78,64 +81,67 @@ async fn both_refund() {
bob_xmr_wallet.clone(),
OsRng,
Uuid::new_v4(),
alice_event_loop_1.peer_id(),
alice_multiaddr.clone(),
);
tokio::spawn(async move { bob_event_loop.run().await });
let alice_swap_id = Uuid::new_v4();
let alice_db_datadir = tempdir().unwrap();
let alice_db = Database::open(alice_db_datadir.path()).unwrap();
let alice_xmr_locked_fut = alice::swap::run_until(
alice_state,
alice::swap::is_xmr_locked,
alice_event_loop_handle,
alice_btc_wallet.clone(),
alice_xmr_wallet.clone(),
Config::regtest(),
alice_swap_id,
alice_db,
);
let alice_xmr_locked_fut = {
let alice_db = Database::open(alice_db_datadir.path()).unwrap();
alice::swap::run_until(
alice_state,
alice::swap::is_xmr_locked,
alice_event_loop_handle_1,
alice_btc_wallet.clone(),
alice_xmr_wallet.clone(),
Config::regtest(),
alice_swap_id,
alice_db,
)
};
tokio::spawn(async move { alice_event_loop.run().await });
tokio::spawn(async move { bob_event_loop.run().await });
// Wait until alice has locked xmr and bob has locked btc
let (bob_state, alice_state) = try_join(bob_fut, alice_xmr_locked_fut).await.unwrap();
// We are selecting with alice_event_loop_1 so that we stop polling on it once
// the try_join is finished.
let (bob_state, alice_restart_state) = select! {
res = try_join(bob_fut, alice_xmr_locked_fut) => res.unwrap(),
_ = alice_event_loop_1.run() => panic!("The event loop should never finish")
};
let bob_state4 = if let BobState::BtcRefunded(state4) = bob_state {
state4
let tx_lock_id = if let BobState::BtcRefunded(state4) = bob_state {
state4.tx_lock_id()
} else {
panic!("Bob in unexpected state");
};
let alice_db = Database::open(alice_db_datadir.path()).unwrap();
let (mut alice_event_loop, alice_event_loop_handle) =
let (mut alice_event_loop_2, alice_event_loop_handle_2) =
testutils::init_alice_event_loop(alice_multiaddr);
let alice_state = alice::swap::swap(
alice_state,
alice_event_loop_handle,
alice_btc_wallet.clone(),
alice_xmr_wallet.clone(),
Config::regtest(),
alice_swap_id,
alice_db,
)
.await
.unwrap();
tokio::spawn(async move { alice_event_loop.run().await });
let alice_final_state = {
let alice_db = Database::open(alice_db_datadir.path()).unwrap();
alice::swap::swap(
alice_restart_state,
alice_event_loop_handle_2,
alice_btc_wallet.clone(),
alice_xmr_wallet.clone(),
Config::regtest(),
alice_swap_id,
alice_db,
)
.await
.unwrap()
};
tokio::spawn(async move { alice_event_loop_2.run().await });
assert!(matches!(alice_state, AliceState::XmrRefunded));
assert!(matches!(alice_final_state, AliceState::XmrRefunded));
let btc_alice_final = alice_btc_wallet.as_ref().balance().await.unwrap();
let btc_bob_final = bob_btc_wallet.as_ref().balance().await.unwrap();
// lock_tx_bitcoin_fee is determined by the wallet, it is not necessarily equal
// to TX_FEE
let lock_tx_bitcoin_fee = bob_btc_wallet
.transaction_fee(bob_state4.tx_lock_id())
.await
.unwrap();
let lock_tx_bitcoin_fee = bob_btc_wallet.transaction_fee(tx_lock_id).await.unwrap();
assert_eq!(btc_alice_final, alice_btc_starting_balance);

View File

@ -1,5 +1,5 @@
use bitcoin_harness::Bitcoind;
use libp2p::core::Multiaddr;
use libp2p::{core::Multiaddr, PeerId};
use monero_harness::{image, Monero};
use rand::rngs::OsRng;
use std::sync::Arc;
@ -155,7 +155,7 @@ pub async fn init_alice(
}
pub async fn init_bob_state(
alice_multiaddr: Multiaddr,
alice_peer_id: PeerId,
btc_to_swap: bitcoin::Amount,
xmr_to_swap: xmr_btc::monero::Amount,
bob_btc_wallet: Arc<bitcoin::Wallet>,
@ -179,7 +179,7 @@ pub async fn init_bob_state(
BobState::Started {
state0,
amounts,
addr: alice_multiaddr,
alice_peer_id,
}
}
@ -192,6 +192,7 @@ pub fn init_bob_event_loop() -> (bob::event_loop::EventLoop, bob::event_loop::Ev
#[allow(clippy::too_many_arguments)]
pub async fn init_bob(
alice_multiaddr: Multiaddr,
alice_peer_id: PeerId,
bitcoind: &Bitcoind<'_>,
monero: &Monero,
btc_to_swap: bitcoin::Amount,
@ -217,7 +218,7 @@ pub async fn init_bob(
.await;
let bob_state = init_bob_state(
alice_multiaddr,
alice_peer_id.clone(),
btc_to_swap,
xmr_to_swap,
bob_btc_wallet.clone(),
@ -225,7 +226,12 @@ pub async fn init_bob(
)
.await;
let (event_loop, event_loop_handle) = init_bob_event_loop();
let (event_loop, mut event_loop_handle) = init_bob_event_loop();
event_loop_handle
.add_address(alice_peer_id, alice_multiaddr)
.await
.unwrap();
let bob_db_dir = tempdir().unwrap();
let bob_db = Database::open(bob_db_dir.path()).unwrap();