143: Upgrade to tokio 1.0 r=D4nte a=rishflab

When we were refactoring tests we realised we probably want the ability to abort a `tokio::JoinHandle` to kill the `EventLoop` to simulate a real world crash. tokio 1.0 is needed for this. It is probably about time to upgrade tokio anyway. 

In order to upgrade to tokio 1.0 the following dependencies were also upgraded in the swap crate and monero-harness-rs
* backoff
* libp2p
* request

UPDATE: This should be merged until the following dependencies are uprgraded to Tokio 1.0 or Tokio compat  is used

- [x] bitcoin-harness-rs https://github.com/coblox/bitcoin-harness-rs/pull/20

Co-authored-by: rishflab <rishflab@hotmail.com>
Co-authored-by: Franck Royer <franck@coblox.tech>
This commit is contained in:
bors[bot] 2021-01-29 04:42:23 +00:00 committed by GitHub
commit f84cd001b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
44 changed files with 972 additions and 789 deletions

View File

@ -78,7 +78,6 @@ jobs:
- name: Install Rust toolchain
uses: actions-rs/toolchain@v1
with:
toolchain: nightly
profile: minimal
override: true
@ -87,26 +86,29 @@ jobs:
if: matrix.os == 'ubuntu-latest'
with:
path: target
key: rust-${{ matrix.target }}-${{ matrix.rust_toolchain }}-target-directory-${{ hashFiles('Cargo.lock') }}-v1
key: rust-${{ matrix.target }}-target-directory-${{ hashFiles('Cargo.lock') }}-v1
- name: Cache ~/.cargo/registry directory
uses: actions/cache@v1
with:
path: ~/.cargo/registry
key: rust-${{ matrix.target }}-${{ matrix.rust_toolchain }}-cargo-registry-directory-${{ hashFiles('Cargo.lock') }}-v1
key: rust-${{ matrix.target }}-cargo-registry-directory-${{ hashFiles('Cargo.lock') }}-v1
- name: Cargo check release code with default features
run: cargo +nightly check --workspace
run: cargo check --workspace
- name: Cargo check all features
run: cargo +nightly check --workspace --all-targets --all-features
run: cargo check --workspace --all-targets --all-features
- name: Cargo test
if: (!matrix.skip_tests)
run: cargo +nightly test --workspace --all-features -- -Z unstable-options --report-time
env:
MONERO_ADDITIONAL_SLEEP_PERIOD: 60000
RUST_MIN_STACK: 16777216 # 16 MB. Default is 8MB. This is fine as in tests we start 2 programs: Alice and Bob.
- name: Build tests
run: cargo build --tests --workspace --all-features
- name: Run monero-harness tests
if: matrix.os == 'ubuntu-latest'
run: cargo test --package monero-harness --all-features
- name: Run library tests for swap
run: cargo test --package swap --lib --all-features
- name: Build binary
run: |
@ -117,3 +119,48 @@ jobs:
with:
name: swap-${{ matrix.target }}
path: target/${{ matrix.target }}/debug/swap
docker_tests:
env:
TARGET: x86_64-unknown-linux-gnu
strategy:
matrix:
test_name: [
happy_path,
happy_path_restart_alice,
happy_path_restart_bob_after_comm,
happy_path_restart_bob_after_lock_proof_received,
happy_path_restart_bob_before_comm,
punish,
refund_restart_alice_cancelled,
refund_restart_alice,
]
runs-on: ubuntu-latest
steps:
- name: Checkout sources
uses: actions/checkout@v2
- name: Install Rust toolchain
uses: actions-rs/toolchain@v1
with:
profile: minimal
override: true
- name: Cache target directory
uses: actions/cache@v1
with:
path: target
key: rust-${{ env.TARGET }}-target-directory-${{ hashFiles('Cargo.lock') }}-v1
- name: Cache ~/.cargo/registry directory
uses: actions/cache@v1
with:
path: ~/.cargo/registry
key: rust-${{ env.TARGET }}-cargo-registry-directory-${{ hashFiles('Cargo.lock') }}-v1
- name: Run test ${{ matrix.test_name }}
run: cargo test --package swap --all-features --test ${{ matrix.test_name }} ""
env:
MONERO_ADDITIONAL_SLEEP_PERIOD: 60000
RUST_MIN_STACK: 16777216 # 16 MB. Default is 8MB. This is fine as in tests we start 2 programs: Alice and Bob.

1074
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,5 +1,13 @@
status = [
"static_analysis",
"build_test (x86_64-unknown-linux-gnu)",
"build_test (x86_64-apple-darwin)"
"build_test (x86_64-apple-darwin)",
"docker_tests (happy_path)",
"docker_tests (happy_path_restart_alice)",
"docker_tests (happy_path_restart_bob_after_comm)",
"docker_tests (happy_path_restart_bob_after_lock_proof_received)",
"docker_tests (happy_path_restart_bob_before_comm)",
"docker_tests (punish)",
"docker_tests (refund_restart_alice_cancelled)",
"docker_tests (refund_restart_alice)",
]

View File

@ -10,12 +10,12 @@ digest_auth = "0.2.3"
futures = "0.3"
port_check = "0.1"
rand = "0.7"
reqwest = { version = "0.10", default-features = false, features = ["json", "native-tls"] }
reqwest = { version = "0.11", default-features = false, features = ["json", "native-tls"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
spectral = "0.6"
testcontainers = "0.11"
tokio = { version = "0.2", default-features = false, features = ["blocking", "macros", "rt-core", "time"] }
tokio = { version = "1.0", default-features = false, features = ["rt-multi-thread", "time", "macros"] }
tracing = "0.1"
tracing-log = "0.1"
tracing-subscriber = { version = "0.2", default-features = false, features = ["fmt", "ansi", "env-filter"] }

View File

@ -266,7 +266,7 @@ impl<'c> MoneroWalletRpc {
// ~30 seconds
bail!("Wallet could not catch up with monerod after 30 retries.")
}
time::delay_for(Duration::from_millis(WAIT_WALLET_SYNC_MILLIS)).await;
time::sleep(Duration::from_millis(WAIT_WALLET_SYNC_MILLIS)).await;
retry += 1;
}
Ok(())
@ -293,7 +293,7 @@ impl<'c> MoneroWalletRpc {
/// Mine a block ever BLOCK_TIME_SECS seconds.
async fn mine(monerod: monerod::Client, reward_address: String) -> Result<()> {
loop {
time::delay_for(Duration::from_secs(BLOCK_TIME_SECS)).await;
time::sleep(Duration::from_secs(BLOCK_TIME_SECS)).await;
monerod.generate_blocks(1, &reward_address).await?;
}
}

View File

@ -22,7 +22,7 @@ async fn init_miner_and_mine_to_miner_address() {
let got_miner_balance = miner_wallet.balance().await.unwrap();
assert_that!(got_miner_balance).is_greater_than(0);
time::delay_for(Duration::from_millis(1010)).await;
time::sleep(Duration::from_millis(1010)).await;
// after a bit more than 1 sec another block should have been mined
let block_height = monerod.client().get_block_count().await.unwrap();

View File

@ -10,10 +10,10 @@ anyhow = "1"
async-recursion = "0.3.1"
async-trait = "0.1"
atty = "0.2"
backoff = { version = "0.2", features = ["tokio"] }
backoff = { git = "https://github.com/ihrwein/backoff", rev = "9d03992a83dfdc596be26276d4e5c5254a4b11a2", features = ["tokio"] }
base64 = "0.12"
bitcoin = { version = "0.25", features = ["rand", "use-serde"] }
bitcoin-harness = { git = "https://github.com/coblox/bitcoin-harness-rs", rev = "864b55fcba2e770105f135781dd2e3002c503d12" }
bitcoin-harness = { git = "https://github.com/coblox/bitcoin-harness-rs", rev = "ae2f6cd547496e680941c0910018bbe884128799" }
conquer-once = "0.3"
cross-curve-dleq = { git = "https://github.com/comit-network/cross-curve-dleq", rev = "eddcdea1d1f16fa33ef581d1744014ece535c920", features = ["serde"] }
curve25519-dalek = "2"
@ -21,8 +21,7 @@ derivative = "2"
ecdsa_fun = { git = "https://github.com/LLFourn/secp256kfun", rev = "cdfbc766045ea678a41780919d6228dd5acee3be", features = ["libsecp_compat", "serde"] }
ed25519-dalek = { version = "1.0.0-pre.4", features = ["serde"] }# Cannot be 1 because they depend on curve25519-dalek version 3
futures = { version = "0.3", default-features = false }
libp2p = { version = "0.29", default-features = false, features = ["tcp-tokio", "yamux", "mplex", "dns", "noise", "request-response"] }
libp2p-tokio-socks5 = "0.4"
libp2p = { version = "0.34", default-features = false, features = ["tcp-tokio", "yamux", "mplex", "dns", "noise", "request-response"] }
log = { version = "0.4", features = ["serde"] }
miniscript = { version = "4", features = ["serde"] }
monero = { version = "0.9", features = ["serde_support"] }
@ -30,7 +29,7 @@ monero-harness = { path = "../monero-harness" }
pem = "0.8"
prettytable-rs = "0.8"
rand = "0.7"
reqwest = { version = "0.10", default-features = false, features = ["socks"] }
reqwest = { version = "0.11", default-features = false }
rust_decimal = "1.8"
serde = { version = "1", features = ["derive"] }
serde_cbor = "0.11"
@ -43,7 +42,7 @@ strum = { version = "0.20", features = ["derive"] }
tempfile = "3"
thiserror = "1"
time = "0.2"
tokio = { version = "0.2", features = ["rt-threaded", "time", "macros", "sync"] }
tokio = { version = "1.0", features = ["rt-multi-thread", "time", "macros", "sync"] }
tracing = { version = "0.1", features = ["attributes"] }
tracing-core = "0.1"
tracing-futures = { version = "0.2", features = ["std-future", "futures-03"] }
@ -55,7 +54,7 @@ void = "1"
[dev-dependencies]
get-port = "3"
hyper = "0.13"
hyper = "0.14"
port_check = "0.1"
serde_cbor = "0.11"
spectral = "0.6"

View File

@ -252,7 +252,7 @@ where
B: GetBlockHeight,
{
while client.get_block_height().await < target {
tokio::time::delay_for(std::time::Duration::from_secs(1)).await;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}

View File

@ -9,7 +9,7 @@ use crate::{
use ::bitcoin::{util::psbt::PartiallySignedTransaction, Txid};
use anyhow::{Context, Result};
use async_trait::async_trait;
use backoff::{backoff::Constant as ConstantBackoff, future::FutureOperation as _};
use backoff::{backoff::Constant as ConstantBackoff, tokio::retry};
use bitcoin_harness::{bitcoind_rpc::PsbtBase64, BitcoindRpcApi};
use reqwest::Url;
use std::time::Duration;
@ -112,10 +112,11 @@ impl BroadcastSignedTransaction for Wallet {
#[async_trait]
impl WatchForRawTransaction for Wallet {
async fn watch_for_raw_transaction(&self, txid: Txid) -> Transaction {
(|| async { Ok(self.inner.get_raw_transaction(txid).await?) })
.retry(ConstantBackoff::new(Duration::from_secs(1)))
.await
.expect("transient errors to be retried")
retry(ConstantBackoff::new(Duration::from_secs(1)), || async {
Ok(self.inner.get_raw_transaction(txid).await?)
})
.await
.expect("transient errors to be retried")
}
}
@ -130,10 +131,11 @@ impl GetRawTransaction for Wallet {
#[async_trait]
impl GetBlockHeight for Wallet {
async fn get_block_height(&self) -> BlockHeight {
let height = (|| async { Ok(self.inner.client.getblockcount().await?) })
.retry(ConstantBackoff::new(Duration::from_secs(1)))
.await
.expect("transient errors to be retried");
let height = retry(ConstantBackoff::new(Duration::from_secs(1)), || async {
Ok(self.inner.client.getblockcount().await?)
})
.await
.expect("transient errors to be retried");
BlockHeight::new(height)
}
@ -148,7 +150,7 @@ impl TransactionBlockHeight for Wallet {
NotYetMined,
}
let height = (|| async {
let height = retry(ConstantBackoff::new(Duration::from_secs(1)), || async {
let block_height = self
.inner
.transaction_block_height(txid)
@ -160,7 +162,6 @@ impl TransactionBlockHeight for Wallet {
Result::<_, backoff::Error<Error>>::Ok(block_height)
})
.retry(ConstantBackoff::new(Duration::from_secs(1)))
.await
.expect("transient errors to be retried");

View File

@ -16,9 +16,23 @@ pub struct Config {
pub monero_network: monero::Network,
}
impl Config {
pub fn mainnet() -> Self {
Self {
// TODO: This trait is not needed
pub trait GetConfig {
fn get_config() -> Config;
}
#[derive(Clone, Copy)]
pub struct Mainnet;
#[derive(Clone, Copy)]
pub struct Testnet;
#[derive(Clone, Copy)]
pub struct Regtest;
impl GetConfig for Mainnet {
fn get_config() -> Config {
Config {
bob_time_to_act: *mainnet::BOB_TIME_TO_ACT,
bitcoin_finality_confirmations: mainnet::BITCOIN_FINALITY_CONFIRMATIONS,
bitcoin_avg_block_time: *mainnet::BITCOIN_AVG_BLOCK_TIME,
@ -29,9 +43,11 @@ impl Config {
monero_network: monero::Network::Mainnet,
}
}
}
pub fn testnet() -> Self {
Self {
impl GetConfig for Testnet {
fn get_config() -> Config {
Config {
bob_time_to_act: *testnet::BOB_TIME_TO_ACT,
bitcoin_finality_confirmations: testnet::BITCOIN_FINALITY_CONFIRMATIONS,
bitcoin_avg_block_time: *testnet::BITCOIN_AVG_BLOCK_TIME,
@ -42,9 +58,11 @@ impl Config {
monero_network: monero::Network::Stagenet,
}
}
}
pub fn regtest() -> Self {
Self {
impl GetConfig for Regtest {
fn get_config() -> Config {
Config {
bob_time_to_act: *regtest::BOB_TIME_TO_ACT,
bitcoin_finality_confirmations: regtest::BITCOIN_FINALITY_CONFIRMATIONS,
bitcoin_avg_block_time: *regtest::BITCOIN_AVG_BLOCK_TIME,
@ -104,7 +122,7 @@ mod regtest {
pub static MONERO_FINALITY_CONFIRMATIONS: u32 = 1;
pub static BITCOIN_CANCEL_TIMELOCK: Timelock = Timelock::new(50);
pub static BITCOIN_CANCEL_TIMELOCK: Timelock = Timelock::new(100);
pub static BITCOIN_PUNISH_TIMELOCK: Timelock = Timelock::new(50);
}

View File

@ -60,7 +60,7 @@ impl From<&AliceState> for Alice {
..
} => Alice::Negotiated {
state3: state3.as_ref().clone(),
bob_peer_id: bob_peer_id.clone(),
bob_peer_id: *bob_peer_id,
},
AliceState::BtcLocked {
state3,
@ -68,7 +68,7 @@ impl From<&AliceState> for Alice {
..
} => Alice::BtcLocked {
state3: state3.as_ref().clone(),
bob_peer_id: bob_peer_id.clone(),
bob_peer_id: *bob_peer_id,
},
AliceState::XmrLocked { state3 } => Alice::XmrLocked(state3.as_ref().clone()),
AliceState::EncSigLearned {

View File

@ -14,14 +14,15 @@
use crate::cli::{Command, Options, Resume};
use anyhow::{Context, Result};
use config::Config;
use config::{Config, GetConfig};
use database::Database;
use log::LevelFilter;
use prettytable::{row, Table};
use protocol::{alice, bob, bob::Builder, SwapAmounts};
use std::sync::Arc;
use structopt::StructOpt;
use trace::init_tracing;
use tracing::{info, log::LevelFilter};
use tracing::info;
use uuid::Uuid;
pub mod bitcoin;
@ -45,7 +46,7 @@ async fn main() -> Result<()> {
init_tracing(LevelFilter::Info).expect("initialize tracing");
let opt = Options::from_args();
let config = Config::testnet();
let config = config::Testnet::get_config();
info!(
"Database and Seed will be stored in directory: {}",
@ -95,8 +96,7 @@ async fn main() -> Result<()> {
Arc::new(monero_wallet),
db_path,
listen_addr,
)
.await;
);
let (swap, mut event_loop) =
alice_factory.with_init_params(swap_amounts).build().await?;
@ -184,8 +184,7 @@ async fn main() -> Result<()> {
Arc::new(monero_wallet),
db_path,
listen_addr,
)
.await;
);
let (swap, mut event_loop) = alice_factory.build().await?;
tokio::spawn(async move { event_loop.run().await });

View File

@ -5,7 +5,7 @@ use crate::monero::{
use ::monero::{Address, Network, PrivateKey, PublicKey};
use anyhow::Result;
use async_trait::async_trait;
use backoff::{backoff::Constant as ConstantBackoff, future::FutureOperation as _};
use backoff::{backoff::Constant as ConstantBackoff, tokio::retry};
use bitcoin::hashes::core::sync::atomic::AtomicU32;
use monero_harness::rpc::wallet;
use std::{
@ -96,7 +96,6 @@ impl CreateWalletForOutput for Wallet {
// TODO: For retry, use `backoff::ExponentialBackoff` in production as opposed
// to `ConstantBackoff`.
#[async_trait]
impl WatchForTransfer for Wallet {
async fn watch_for_transfer(
@ -117,46 +116,41 @@ impl WatchForTransfer for Wallet {
let wallet = self.inner.clone();
let confirmations = Arc::new(AtomicU32::new(0u32));
let res = (move || {
let confirmations = confirmations.clone();
let transfer_proof = transfer_proof.clone();
let wallet = wallet.clone();
async move {
// NOTE: Currently, this is conflicting IO errors with the transaction not being
// in the blockchain yet, or not having enough confirmations on it. All these
// errors warrant a retry, but the strategy should probably differ per case
let proof = wallet
.check_tx_key(
&String::from(transfer_proof.tx_hash()),
&transfer_proof.tx_key().to_string(),
&address.to_string(),
)
.await
.map_err(|_| backoff::Error::Transient(Error::TxNotFound))?;
if proof.received != expected_amount.as_piconero() {
return Err(backoff::Error::Permanent(Error::InsufficientFunds {
expected: expected_amount,
actual: Amount::from_piconero(proof.received),
}));
}
let res = retry(ConstantBackoff::new(Duration::from_secs(1)), || async {
// NOTE: Currently, this is conflicting IO errors with the transaction not being
// in the blockchain yet, or not having enough confirmations on it. All these
// errors warrant a retry, but the strategy should probably differ per case
let proof = wallet
.check_tx_key(
&String::from(transfer_proof.tx_hash()),
&transfer_proof.tx_key().to_string(),
&address.to_string(),
)
.await
.map_err(|_| backoff::Error::Transient(Error::TxNotFound))?;
if proof.confirmations > confirmations.load(Ordering::SeqCst) {
confirmations.store(proof.confirmations, Ordering::SeqCst);
info!(
"Monero lock tx received {} out of {} confirmations",
proof.confirmations, expected_confirmations
);
}
if proof.confirmations < expected_confirmations {
return Err(backoff::Error::Transient(Error::InsufficientConfirmations));
}
Ok(proof)
if proof.received != expected_amount.as_piconero() {
return Err(backoff::Error::Permanent(Error::InsufficientFunds {
expected: expected_amount,
actual: Amount::from_piconero(proof.received),
}));
}
if proof.confirmations > confirmations.load(Ordering::SeqCst) {
confirmations.store(proof.confirmations, Ordering::SeqCst);
info!(
"Monero lock tx received {} out of {} confirmations",
proof.confirmations, expected_confirmations
);
}
if proof.confirmations < expected_confirmations {
return Err(backoff::Error::Transient(Error::InsufficientConfirmations));
}
Ok(proof)
})
.retry(ConstantBackoff::new(Duration::from_secs(1)))
.await;
if let Err(Error::InsufficientFunds { expected, actual }) = res {

View File

@ -12,7 +12,7 @@ use std::{
task::Poll,
};
#[derive(Debug)]
#[derive(Debug, Copy, Clone)]
pub enum OutEvent {
ConnectionEstablished(PeerId),
}
@ -42,7 +42,7 @@ impl PeerTracker {
/// Returns the peer id of counterparty if we are connected.
pub fn counterparty_peer_id(&self) -> Option<PeerId> {
if let Some((id, _)) = &self.connected {
return Some(id.clone());
return Some(*id);
}
None
}
@ -50,7 +50,7 @@ impl PeerTracker {
/// Returns the peer_id and multiaddr of counterparty if we are connected.
pub fn counterparty(&self) -> Option<(PeerId, Multiaddr)> {
if let Some((peer_id, addr)) = &self.connected {
return Some((peer_id.clone(), addr.clone()));
return Some((*peer_id, addr.clone()));
}
None
}
@ -97,18 +97,18 @@ impl NetworkBehaviour for PeerTracker {
) {
match point {
ConnectedPoint::Dialer { address } => {
self.connected = Some((peer.clone(), address.clone()));
self.connected = Some((*peer, address.clone()));
}
ConnectedPoint::Listener {
local_addr: _,
send_back_addr,
} => {
self.connected = Some((peer.clone(), send_back_addr.clone()));
self.connected = Some((*peer, send_back_addr.clone()));
}
}
self.events
.push_back(OutEvent::ConnectionEstablished(peer.clone()));
.push_back(OutEvent::ConnectionEstablished(*peer));
}
fn inject_connection_closed(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) {

View File

@ -7,7 +7,6 @@ use libp2p::{
};
use serde::{Deserialize, Serialize};
use std::{fmt::Debug, io, marker::PhantomData};
use tracing::debug;
/// Time to wait for a response back once we send a request.
pub const TIMEOUT: u64 = 3600; // One hour.
@ -123,7 +122,6 @@ where
where
T: AsyncRead + Unpin + Send,
{
debug!("enter read_request");
let message = upgrade::read_one(io, BUF_SIZE)
.await
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
@ -144,7 +142,6 @@ where
where
T: AsyncRead + Unpin + Send,
{
debug!("enter read_response");
let message = upgrade::read_one(io, BUF_SIZE)
.await
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
@ -183,7 +180,6 @@ where
where
T: AsyncWrite + Unpin + Send,
{
debug!("enter write_response");
let bytes = serde_cbor::to_vec(&res).map_err(|e| {
tracing::debug!("serde write_reponse error: {:?}", e);
io::Error::new(io::ErrorKind::InvalidData, e)
@ -212,7 +208,6 @@ where
where
T: AsyncRead + Unpin + Send,
{
debug!("enter read_request");
let message = upgrade::read_one(io, BUF_SIZE).await.map_err(|e| match e {
ReadOneError::Io(err) => err,
e => io::Error::new(io::ErrorKind::Other, e),
@ -234,7 +229,6 @@ where
where
T: AsyncRead + Unpin + Send,
{
debug!("enter read_response");
let message = upgrade::read_one(io, BUF_SIZE)
.await
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
@ -273,7 +267,6 @@ where
where
T: AsyncWrite + Unpin + Send,
{
debug!("enter write_response");
let bytes = serde_cbor::to_vec(&res).map_err(|e| {
tracing::debug!("serde write_reponse error: {:?}", e);
io::Error::new(io::ErrorKind::InvalidData, e)

View File

@ -31,7 +31,7 @@ pub fn build(id_keys: identity::Keypair) -> Result<SwapTransport> {
.upgrade(Version::V1)
.authenticate(noise)
.multiplex(SelectUpgrade::new(
yamux::Config::default(),
yamux::YamuxConfig::default(),
MplexConfig::new(),
))
.map(|(peer, muxer), _| (peer, StreamMuxerBox::new(muxer)))

View File

@ -75,7 +75,7 @@ enum InitParams {
}
impl Builder {
pub async fn new(
pub fn new(
seed: Seed,
config: Config,
swap_id: Uuid,
@ -165,7 +165,7 @@ impl Builder {
}
pub fn peer_id(&self) -> PeerId {
self.peer_id.clone()
self.peer_id
}
pub fn listen_address(&self) -> Multiaddr {
@ -210,8 +210,8 @@ impl Builder {
EventLoop::new(
alice_transport,
alice_behaviour,
self.listen_address.clone(),
self.peer_id.clone(),
self.listen_address(),
self.peer_id,
)
}
}
@ -236,7 +236,7 @@ pub enum OutEvent {
msg: Box<bob::Message2>,
bob_peer_id: PeerId,
},
TransferProof,
TransferProofAcknowledged,
EncryptedSignature(EncryptedSignature),
}
@ -289,7 +289,7 @@ impl From<message2::OutEvent> for OutEvent {
impl From<transfer_proof::OutEvent> for OutEvent {
fn from(event: transfer_proof::OutEvent) -> Self {
match event {
transfer_proof::OutEvent::Msg => OutEvent::TransferProof,
transfer_proof::OutEvent::Acknowledged => OutEvent::TransferProofAcknowledged,
}
}
}
@ -322,21 +322,32 @@ impl Behaviour {
&mut self,
channel: ResponseChannel<AliceToBob>,
swap_response: SwapResponse,
) {
self.amounts.send(channel, swap_response);
) -> Result<()> {
self.amounts.send(channel, swap_response)?;
info!("Sent amounts response");
Ok(())
}
/// Send Message0 to Bob in response to receiving his Message0.
pub fn send_message0(&mut self, channel: ResponseChannel<AliceToBob>, msg: Message0) {
self.message0.send(channel, msg);
pub fn send_message0(
&mut self,
channel: ResponseChannel<AliceToBob>,
msg: Message0,
) -> Result<()> {
self.message0.send(channel, msg)?;
debug!("Sent Message0");
Ok(())
}
/// Send Message1 to Bob in response to receiving his Message1.
pub fn send_message1(&mut self, channel: ResponseChannel<AliceToBob>, msg: Message1) {
self.message1.send(channel, msg);
pub fn send_message1(
&mut self,
channel: ResponseChannel<AliceToBob>,
msg: Message1,
) -> Result<()> {
self.message1.send(channel, msg)?;
debug!("Sent Message1");
Ok(())
}
/// Send Transfer Proof to Bob.

View File

@ -82,7 +82,10 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<Request, Response>> for B
debug!("Received encrypted signature");
self.events.push_back(OutEvent::Msg(*msg));
// Send back empty response so that the request/response protocol completes.
let _ = self.rr.send_response(channel, Response::EncryptedSignature);
if let Err(error) = self.rr.send_response(channel, Response::EncryptedSignature)
{
error!("Failed to send Encrypted Signature ack: {:?}", error);
}
}
}
RequestResponseEvent::Message {
@ -95,6 +98,9 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<Request, Response>> for B
RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error);
}
RequestResponseEvent::ResponseSent { .. } => {
debug!("Alice has sent an Message3 response to Bob");
}
}
}
}

View File

@ -8,12 +8,11 @@ use crate::{
},
};
use anyhow::{anyhow, Context, Result};
use futures::FutureExt;
use libp2p::{
core::Multiaddr, futures::StreamExt, request_response::ResponseChannel, PeerId, Swarm,
core::Multiaddr, futures::FutureExt, request_response::ResponseChannel, PeerId, Swarm,
};
use tokio::sync::mpsc::{Receiver, Sender};
use tracing::trace;
use tracing::{error, trace};
#[allow(missing_debug_implementations)]
pub struct Channels<T> {
@ -46,6 +45,7 @@ pub struct EventLoopHandle {
send_message0: Sender<(ResponseChannel<AliceToBob>, alice::Message0)>,
send_message1: Sender<(ResponseChannel<AliceToBob>, alice::Message1)>,
send_transfer_proof: Sender<(PeerId, TransferProof)>,
recv_transfer_proof_ack: Receiver<()>,
}
impl EventLoopHandle {
@ -125,6 +125,11 @@ impl EventLoopHandle {
pub async fn send_transfer_proof(&mut self, bob: PeerId, msg: TransferProof) -> Result<()> {
let _ = self.send_transfer_proof.send((bob, msg)).await?;
self.recv_transfer_proof_ack
.recv()
.await
.ok_or_else(|| anyhow!("Failed to receive transfer proof ack from Bob"))?;
Ok(())
}
}
@ -142,6 +147,7 @@ pub struct EventLoop {
send_message0: Receiver<(ResponseChannel<AliceToBob>, alice::Message0)>,
send_message1: Receiver<(ResponseChannel<AliceToBob>, alice::Message1)>,
send_transfer_proof: Receiver<(PeerId, TransferProof)>,
recv_transfer_proof_ack: Sender<()>,
}
impl EventLoop {
@ -170,6 +176,7 @@ impl EventLoop {
let send_message0 = Channels::new();
let send_message1 = Channels::new();
let send_transfer_proof = Channels::new();
let recv_transfer_proof_ack = Channels::new();
let driver = EventLoop {
swarm,
@ -183,6 +190,7 @@ impl EventLoop {
send_message0: send_message0.receiver,
send_message1: send_message1.receiver,
send_transfer_proof: send_transfer_proof.receiver,
recv_transfer_proof_ack: recv_transfer_proof_ack.sender,
};
let handle = EventLoopHandle {
@ -196,6 +204,7 @@ impl EventLoop {
send_message0: send_message0.sender,
send_message1: send_message1.sender,
send_transfer_proof: send_transfer_proof.sender,
recv_transfer_proof_ack: recv_transfer_proof_ack.receiver,
};
Ok((driver, handle))
@ -218,7 +227,10 @@ impl EventLoop {
OutEvent::Message2 { msg, bob_peer_id : _} => {
let _ = self.recv_message2.send(*msg).await;
}
OutEvent::TransferProof => trace!("Bob ack'd receiving the transfer proof"),
OutEvent::TransferProofAcknowledged => {
trace!("Bob acknowledged transfer proof");
let _ = self.recv_transfer_proof_ack.send(()).await;
}
OutEvent::EncryptedSignature(msg) => {
let _ = self.recv_encrypted_signature.send(msg).await;
}
@ -227,24 +239,33 @@ impl EventLoop {
}
}
},
swap_response = self.send_swap_response.next().fuse() => {
swap_response = self.send_swap_response.recv().fuse() => {
if let Some((channel, swap_response)) = swap_response {
self.swarm.send_swap_response(channel, swap_response);
let _ = self
.swarm
.send_swap_response(channel, swap_response)
.map_err(|err|error!("Failed to send swap response: {:#}", err));
}
},
msg0 = self.send_message0.next().fuse() => {
msg0 = self.send_message0.recv().fuse() => {
if let Some((channel, msg)) = msg0 {
self.swarm.send_message0(channel, msg);
let _ = self
.swarm
.send_message0(channel, msg)
.map_err(|err|error!("Failed to send message0: {:#}", err));
}
},
msg1 = self.send_message1.next().fuse() => {
msg1 = self.send_message1.recv().fuse() => {
if let Some((channel, msg)) = msg1 {
self.swarm.send_message1(channel, msg);
let _ = self
.swarm
.send_message1(channel, msg)
.map_err(|err|error!("Failed to send message1: {:#}", err));
}
},
transfer_proof = self.send_transfer_proof.next().fuse() => {
transfer_proof = self.send_transfer_proof.recv().fuse() => {
if let Some((bob_peer_id, msg)) = transfer_proof {
self.swarm.send_transfer_proof(bob_peer_id, msg);
self.swarm.send_transfer_proof(bob_peer_id, msg)
}
},
}

View File

@ -3,6 +3,7 @@ use crate::{
network::request_response::{AliceToBob, BobToAlice, Codec, Message0Protocol, TIMEOUT},
protocol::bob,
};
use anyhow::{anyhow, Result};
use libp2p::{
request_response::{
handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig,
@ -49,9 +50,11 @@ pub struct Behaviour {
}
impl Behaviour {
pub fn send(&mut self, channel: ResponseChannel<AliceToBob>, msg: Message0) {
pub fn send(&mut self, channel: ResponseChannel<AliceToBob>, msg: Message0) -> Result<()> {
let msg = AliceToBob::Message0(Box::new(msg));
self.rr.send_response(channel, msg);
self.rr
.send_response(channel, msg)
.map_err(|_| anyhow!("Sending Amounts response failed"))
}
fn poll(
&mut self,
@ -108,6 +111,9 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>>
RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error);
}
RequestResponseEvent::ResponseSent { .. } => {
debug!("Alice has sent an Message0 response to Bob");
}
}
}
}

View File

@ -2,6 +2,7 @@ use crate::{
network::request_response::{AliceToBob, BobToAlice, Codec, Message1Protocol, TIMEOUT},
protocol::bob,
};
use anyhow::{anyhow, Result};
use ecdsa_fun::{adaptor::EncryptedSignature, Signature};
use libp2p::{
request_response::{
@ -46,9 +47,11 @@ pub struct Behaviour {
}
impl Behaviour {
pub fn send(&mut self, channel: ResponseChannel<AliceToBob>, msg: Message1) {
pub fn send(&mut self, channel: ResponseChannel<AliceToBob>, msg: Message1) -> Result<()> {
let msg = AliceToBob::Message1(Box::new(msg));
self.rr.send_response(channel, msg);
self.rr
.send_response(channel, msg)
.map_err(|_| anyhow!("Sending Amounts response failed"))
}
fn poll(
@ -106,6 +109,9 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>>
RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error);
}
RequestResponseEvent::ResponseSent { .. } => {
debug!("Alice has sent an Message1 response to Bob");
}
}
}
}

View File

@ -96,6 +96,9 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>>
RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error);
}
RequestResponseEvent::ResponseSent { .. } => {
debug!("Alice has sent an Message2 response to Bob");
}
}
}
}

View File

@ -55,6 +55,7 @@ pub async fn run(swap: alice::Swap) -> Result<AliceState> {
run_until(swap, is_complete).await
}
#[tracing::instrument(name = "swap", skip(swap,is_target_state), fields(id = %swap.swap_id))]
pub async fn run_until(
swap: alice::Swap,
is_target_state: fn(&AliceState) -> bool,

View File

@ -1,3 +1,4 @@
use anyhow::{anyhow, Result};
use libp2p::{
request_response::{
handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig,
@ -44,9 +45,11 @@ pub struct Behaviour {
impl Behaviour {
/// Alice always sends her messages as a response to a request from Bob.
pub fn send(&mut self, channel: ResponseChannel<AliceToBob>, msg: SwapResponse) {
pub fn send(&mut self, channel: ResponseChannel<AliceToBob>, msg: SwapResponse) -> Result<()> {
let msg = AliceToBob::SwapResponse(Box::new(msg));
self.rr.send_response(channel, msg);
self.rr
.send_response(channel, msg)
.map_err(|_| anyhow!("Sending swap response failed"))
}
fn poll(
@ -105,6 +108,9 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>>
RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error);
}
RequestResponseEvent::ResponseSent { .. } => {
debug!("Alice has sent an Amounts response to Bob");
}
}
}
}

View File

@ -25,7 +25,7 @@ pub struct TransferProof {
#[derive(Debug, Copy, Clone)]
pub enum OutEvent {
Msg,
Acknowledged,
}
/// A `NetworkBehaviour` that represents sending the Monero transfer proof to
@ -88,7 +88,7 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<Request, Response>> for B
..
} => {
if let Response::TransferProof = response {
self.events.push_back(OutEvent::Msg);
self.events.push_back(OutEvent::Acknowledged);
}
}
RequestResponseEvent::InboundFailure { error, .. } => {
@ -97,6 +97,7 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<Request, Response>> for B
RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error);
}
RequestResponseEvent::ResponseSent { .. } => {}
}
}
}

View File

@ -173,8 +173,8 @@ impl Builder {
bob::event_loop::EventLoop::new(
bob_transport,
bob_behaviour,
self.peer_id.clone(),
self.alice_peer_id.clone(),
self.peer_id,
self.alice_peer_id,
self.alice_address.clone(),
)
}
@ -213,7 +213,7 @@ pub enum OutEvent {
Message1(Box<alice::Message1>),
Message2,
TransferProof(Box<TransferProof>),
EncryptedSignature,
EncryptedSignatureAcknowledged,
}
impl From<peer_tracker::OutEvent> for OutEvent {
@ -267,7 +267,7 @@ impl From<transfer_proof::OutEvent> for OutEvent {
impl From<encrypted_signature::OutEvent> for OutEvent {
fn from(event: encrypted_signature::OutEvent) -> Self {
match event {
encrypted_signature::OutEvent::Msg => OutEvent::EncryptedSignature,
encrypted_signature::OutEvent::Acknowledged => OutEvent::EncryptedSignatureAcknowledged,
}
}
}
@ -289,26 +289,26 @@ pub struct Behaviour {
impl Behaviour {
/// Sends a swap request to Alice to negotiate the swap.
pub fn send_swap_request(&mut self, alice: PeerId, swap_request: SwapRequest) {
let _id = self.swap_request.send(alice.clone(), swap_request);
let _id = self.swap_request.send(alice, swap_request);
info!("Requesting swap from: {}", alice);
}
/// Sends Bob's first message to Alice.
pub fn send_message0(&mut self, alice: PeerId, msg: bob::Message0) {
self.message0.send(alice, msg);
debug!("Sent Message0");
debug!("Message0 sent");
}
/// Sends Bob's second message to Alice.
pub fn send_message1(&mut self, alice: PeerId, msg: bob::Message1) {
self.message1.send(alice, msg);
debug!("Sent Message1");
debug!("Message1 sent");
}
/// Sends Bob's third message to Alice.
pub fn send_message2(&mut self, alice: PeerId, msg: bob::Message2) {
self.message2.send(alice, msg);
debug!("Sent Message2");
debug!("Message2 sent");
}
/// Sends Bob's fourth message to Alice.
@ -319,7 +319,7 @@ impl Behaviour {
) {
let msg = EncryptedSignature { tx_redeem_encsig };
self.encrypted_signature.send(alice, msg);
debug!("Sent Message3");
debug!("Encrypted signature sent");
}
/// Add a known address for the given peer

View File

@ -24,7 +24,7 @@ pub struct EncryptedSignature {
#[derive(Debug, Copy, Clone)]
pub enum OutEvent {
Msg,
Acknowledged,
}
/// A `NetworkBehaviour` that represents sending encrypted signature to Alice.
@ -87,7 +87,7 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<Request, Response>> for B
..
} => {
if let Response::EncryptedSignature = response {
self.events.push_back(OutEvent::Msg);
self.events.push_back(OutEvent::Acknowledged);
}
}
RequestResponseEvent::InboundFailure { error, .. } => {
@ -96,6 +96,9 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<Request, Response>> for B
RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error);
}
RequestResponseEvent::ResponseSent { .. } => {
unreachable!("Bob should never send a Amounts response to Alice");
}
}
}
}

View File

@ -10,10 +10,7 @@ use crate::{
use anyhow::{anyhow, Result};
use futures::FutureExt;
use libp2p::{core::Multiaddr, PeerId};
use tokio::{
stream::StreamExt,
sync::mpsc::{Receiver, Sender},
};
use tokio::sync::mpsc::{Receiver, Sender};
use tracing::{debug, error, info};
#[derive(Debug)]
@ -48,6 +45,7 @@ pub struct EventLoopHandle {
send_message1: Sender<bob::Message1>,
send_message2: Sender<bob::Message2>,
send_encrypted_signature: Sender<EncryptedSignature>,
recv_encrypted_signature_ack: Receiver<()>,
}
impl EventLoopHandle {
@ -117,7 +115,12 @@ impl EventLoopHandle {
&mut self,
tx_redeem_encsig: EncryptedSignature,
) -> Result<()> {
let _ = self.send_encrypted_signature.send(tx_redeem_encsig).await?;
self.send_encrypted_signature.send(tx_redeem_encsig).await?;
self.recv_encrypted_signature_ack
.recv()
.await
.ok_or_else(|| anyhow!("Failed to receive encrypted signature ack from Alice"))?;
Ok(())
}
}
@ -137,6 +140,7 @@ pub struct EventLoop {
send_message1: Receiver<bob::Message1>,
send_message2: Receiver<bob::Message2>,
send_encrypted_signature: Receiver<EncryptedSignature>,
recv_encrypted_signature_ack: Sender<()>,
}
impl EventLoop {
@ -153,7 +157,7 @@ impl EventLoop {
}))
.build();
swarm.add_address(alice_peer_id.clone(), alice_addr);
swarm.add_address(alice_peer_id, alice_addr);
let swap_response = Channels::new();
let recv_message0 = Channels::new();
@ -166,6 +170,7 @@ impl EventLoop {
let send_message1 = Channels::new();
let send_message2 = Channels::new();
let send_encrypted_signature = Channels::new();
let recv_encrypted_signature_ack = Channels::new();
let event_loop = EventLoop {
swarm,
@ -181,6 +186,7 @@ impl EventLoop {
send_message1: send_message1.receiver,
send_message2: send_message2.receiver,
send_encrypted_signature: send_encrypted_signature.receiver,
recv_encrypted_signature_ack: recv_encrypted_signature_ack.sender,
};
let handle = EventLoopHandle {
@ -195,6 +201,7 @@ impl EventLoop {
send_message1: send_message1.sender,
send_message2: send_message2.sender,
send_encrypted_signature: send_encrypted_signature.sender,
recv_encrypted_signature_ack: recv_encrypted_signature_ack.receiver,
};
Ok((event_loop, handle))
@ -221,12 +228,15 @@ impl EventLoop {
OutEvent::TransferProof(msg) => {
let _ = self.recv_transfer_proof.send(*msg).await;
}
OutEvent::EncryptedSignature => info!("Alice acknowledged encrypted signature received"),
OutEvent::EncryptedSignatureAcknowledged => {
debug!("Alice acknowledged encrypted signature");
let _ = self.recv_encrypted_signature_ack.send(()).await;
}
}
},
option = self.dial_alice.next().fuse() => {
option = self.dial_alice.recv().fuse() => {
if option.is_some() {
let peer_id = self.alice_peer_id.clone();
let peer_id = self.alice_peer_id;
if self.swarm.pt.is_connected(&peer_id) {
debug!("Already connected to Alice: {}", peer_id);
let _ = self.conn_established.send(peer_id).await;
@ -240,31 +250,31 @@ impl EventLoop {
}
}
},
swap_request = self.send_swap_request.next().fuse() => {
swap_request = self.send_swap_request.recv().fuse() => {
if let Some(swap_request) = swap_request {
self.swarm.send_swap_request(self.alice_peer_id.clone(), swap_request);
self.swarm.send_swap_request(self.alice_peer_id, swap_request);
}
},
msg0 = self.send_message0.next().fuse() => {
msg0 = self.send_message0.recv().fuse() => {
if let Some(msg) = msg0 {
self.swarm.send_message0(self.alice_peer_id.clone(), msg);
self.swarm.send_message0(self.alice_peer_id, msg);
}
}
msg1 = self.send_message1.next().fuse() => {
msg1 = self.send_message1.recv().fuse() => {
if let Some(msg) = msg1 {
self.swarm.send_message1(self.alice_peer_id.clone(), msg);
self.swarm.send_message1(self.alice_peer_id, msg);
}
},
msg2 = self.send_message2.next().fuse() => {
msg2 = self.send_message2.recv().fuse() => {
if let Some(msg) = msg2 {
self.swarm.send_message2(self.alice_peer_id.clone(), msg);
self.swarm.send_message2(self.alice_peer_id, msg);
}
},
encrypted_signature = self.send_encrypted_signature.next().fuse() => {
encrypted_signature = self.send_encrypted_signature.recv().fuse() => {
if let Some(tx_redeem_encsig) = encrypted_signature {
self.swarm.send_encrypted_signature(self.alice_peer_id.clone(), tx_redeem_encsig);
self.swarm.send_encrypted_signature(self.alice_peer_id, tx_redeem_encsig);
}
}
}

View File

@ -102,6 +102,9 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>>
RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error);
}
RequestResponseEvent::ResponseSent { .. } => {
unreachable!("Bob should never send a Amounts response to Alice");
}
}
}
}

View File

@ -97,6 +97,9 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>>
RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error);
}
RequestResponseEvent::ResponseSent { .. } => {
unreachable!("Bob should never send a Amounts response to Alice");
}
}
}
}

View File

@ -95,6 +95,9 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>>
RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error);
}
RequestResponseEvent::ResponseSent { .. } => {
unreachable!("Bob should never send a Amounts response to Alice");
}
}
}
}

View File

@ -32,6 +32,7 @@ pub async fn run(swap: bob::Swap) -> Result<BobState> {
run_until(swap, is_complete).await
}
#[tracing::instrument(name = "swap", skip(swap,is_target_state), fields(id = %swap.swap_id))]
pub async fn run_until(
swap: bob::Swap,
is_target_state: fn(&BobState) -> bool,

View File

@ -103,6 +103,9 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<BobToAlice, AliceToBob>>
RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error);
}
RequestResponseEvent::ResponseSent { .. } => {
error!("Bob should never send a Amounts response to Alice");
}
}
}
}

View File

@ -92,6 +92,7 @@ impl NetworkBehaviourEventProcess<RequestResponseEvent<Request, Response>> for B
RequestResponseEvent::OutboundFailure { error, .. } => {
error!("Outbound failure: {:?}", error);
}
RequestResponseEvent::ResponseSent { .. } => debug!("Bob ack'd transfer proof message"),
}
}
}

View File

@ -4,7 +4,7 @@ use tracing::{info, subscriber};
use tracing_log::LogTracer;
use tracing_subscriber::FmtSubscriber;
pub fn init_tracing(level: log::LevelFilter) -> anyhow::Result<()> {
pub fn init_tracing(level: LevelFilter) -> anyhow::Result<()> {
if level == LevelFilter::Off {
return Ok(());
}
@ -15,8 +15,8 @@ pub fn init_tracing(level: log::LevelFilter) -> anyhow::Result<()> {
let is_terminal = atty::is(atty::Stream::Stderr);
let subscriber = FmtSubscriber::builder()
.with_env_filter(format!(
"swap={},xmr_btc={},monero_harness={},bitcoin_harness={},http=warn,warp=warn",
level, level, level, level
"swap={},monero_harness={},bitcoin_harness={},http=warn,warp=warn",
level, level, level,
))
.with_writer(std::io::stderr)
.with_ansi(is_terminal)

View File

@ -1,19 +1,20 @@
pub mod testutils;
use swap::protocol::{alice, bob};
use testutils::SlowCancelConfig;
use tokio::join;
/// Run the following tests with RUST_MIN_STACK=10000000
#[tokio::test]
async fn happy_path() {
testutils::setup_test(|mut ctx| async move {
let alice_swap = ctx.new_swap_as_alice().await;
let bob_swap = ctx.new_swap_as_bob().await;
testutils::setup_test(SlowCancelConfig, |mut ctx| async move {
let (alice_swap, _) = ctx.new_swap_as_alice().await;
let (bob_swap, _) = ctx.new_swap_as_bob().await;
let alice = alice::run(alice_swap);
let bob = bob::run(bob_swap);
let (alice_state, bob_state) = join!(alice, bob);
ctx.assert_alice_redeemed(alice_state.unwrap()).await;

View File

@ -1,13 +1,13 @@
pub mod testutils;
use swap::protocol::{alice, alice::AliceState, bob};
use testutils::alice_run_until::is_encsig_learned;
use testutils::{alice_run_until::is_encsig_learned, SlowCancelConfig};
#[tokio::test]
async fn given_alice_restarts_after_encsig_is_learned_resume_swap() {
testutils::setup_test(|mut ctx| async move {
let alice_swap = ctx.new_swap_as_alice().await;
let bob_swap = ctx.new_swap_as_bob().await;
testutils::setup_test(SlowCancelConfig, |mut ctx| async move {
let (alice_swap, alice_join_handle) = ctx.new_swap_as_alice().await;
let (bob_swap, _) = ctx.new_swap_as_bob().await;
let bob = bob::run(bob_swap);
let bob_handle = tokio::spawn(bob);
@ -17,7 +17,7 @@ async fn given_alice_restarts_after_encsig_is_learned_resume_swap() {
.unwrap();
assert!(matches!(alice_state, AliceState::EncSigLearned {..}));
let alice_swap = ctx.recover_alice_from_db().await;
let alice_swap = ctx.stop_and_resume_alice_from_db(alice_join_handle).await;
assert!(matches!(alice_swap.state, AliceState::EncSigLearned {..}));
let alice_state = alice::run(alice_swap).await.unwrap();

View File

@ -1,13 +1,13 @@
pub mod testutils;
use swap::protocol::{alice, bob, bob::BobState};
use testutils::bob_run_until::is_encsig_sent;
use testutils::{bob_run_until::is_encsig_sent, SlowCancelConfig};
#[tokio::test]
async fn given_bob_restarts_after_encsig_is_sent_resume_swap() {
testutils::setup_test(|mut ctx| async move {
let alice_swap = ctx.new_swap_as_alice().await;
let bob_swap = ctx.new_swap_as_bob().await;
testutils::setup_test(SlowCancelConfig, |mut ctx| async move {
let (alice_swap, _) = ctx.new_swap_as_alice().await;
let (bob_swap, bob_join_handle) = ctx.new_swap_as_bob().await;
let alice = alice::run(alice_swap);
let alice_handle = tokio::spawn(alice);
@ -16,7 +16,7 @@ async fn given_bob_restarts_after_encsig_is_sent_resume_swap() {
assert!(matches!(bob_state, BobState::EncSigSent {..}));
let bob_swap = ctx.recover_bob_from_db().await;
let bob_swap = ctx.stop_and_resume_bob_from_db(bob_join_handle).await;
assert!(matches!(bob_swap.state, BobState::EncSigSent {..}));
let bob_state = bob::run(bob_swap).await.unwrap();

View File

@ -1,13 +1,13 @@
pub mod testutils;
use swap::protocol::{alice, bob, bob::BobState};
use testutils::bob_run_until::is_lock_proof_received;
use testutils::{bob_run_until::is_lock_proof_received, SlowCancelConfig};
#[tokio::test]
async fn given_bob_restarts_after_lock_proof_received_resume_swap() {
testutils::setup_test(|mut ctx| async move {
let alice_swap = ctx.new_swap_as_alice().await;
let bob_swap = ctx.new_swap_as_bob().await;
testutils::setup_test(SlowCancelConfig, |mut ctx| async move {
let (alice_swap, _) = ctx.new_swap_as_alice().await;
let (bob_swap, bob_join_handle) = ctx.new_swap_as_bob().await;
let alice_handle = alice::run(alice_swap);
let alice_swap_handle = tokio::spawn(alice_handle);
@ -18,8 +18,9 @@ async fn given_bob_restarts_after_lock_proof_received_resume_swap() {
assert!(matches!(bob_state, BobState::XmrLockProofReceived {..}));
let bob_swap = ctx.recover_bob_from_db().await;
assert!(matches!(bob_swap.state, BobState::XmrLockProofReceived {..}));
let bob_swap = ctx.stop_and_resume_bob_from_db(bob_join_handle).await;
assert!(matches!(bob_swap.state, BobState::XmrLockProofReceived
{..}));
let bob_state = bob::run(bob_swap).await.unwrap();

View File

@ -1,13 +1,13 @@
pub mod testutils;
use swap::protocol::{alice, bob, bob::BobState};
use testutils::bob_run_until::is_xmr_locked;
use testutils::{bob_run_until::is_xmr_locked, SlowCancelConfig};
#[tokio::test]
async fn given_bob_restarts_after_xmr_is_locked_resume_swap() {
testutils::setup_test(|mut ctx| async move {
let alice_swap = ctx.new_swap_as_alice().await;
let bob_swap = ctx.new_swap_as_bob().await;
testutils::setup_test(SlowCancelConfig, |mut ctx| async move {
let (alice_swap, _) = ctx.new_swap_as_alice().await;
let (bob_swap, bob_join_handle) = ctx.new_swap_as_bob().await;
let alice_handle = alice::run(alice_swap);
let alice_swap_handle = tokio::spawn(alice_handle);
@ -16,7 +16,7 @@ async fn given_bob_restarts_after_xmr_is_locked_resume_swap() {
assert!(matches!(bob_state, BobState::XmrLocked {..}));
let bob_swap = ctx.recover_bob_from_db().await;
let bob_swap = ctx.stop_and_resume_bob_from_db(bob_join_handle).await;
assert!(matches!(bob_swap.state, BobState::XmrLocked {..}));
let bob_state = bob::run(bob_swap).await.unwrap();

View File

@ -1,15 +1,15 @@
pub mod testutils;
use swap::protocol::{alice, bob, bob::BobState};
use testutils::bob_run_until::is_btc_locked;
use testutils::{bob_run_until::is_btc_locked, FastPunishConfig};
/// Bob locks Btc and Alice locks Xmr. Bob does not act; he fails to send Alice
/// the encsig and fail to refund or redeem. Alice punishes.
#[tokio::test]
async fn alice_punishes_if_bob_never_acts_after_fund() {
testutils::setup_test(|mut ctx| async move {
let alice_swap = ctx.new_swap_as_alice().await;
let bob_swap = ctx.new_swap_as_bob().await;
testutils::setup_test(FastPunishConfig, |mut ctx| async move {
let (alice_swap, _) = ctx.new_swap_as_alice().await;
let (bob_swap, bob_join_handle) = ctx.new_swap_as_bob().await;
let alice = alice::run(alice_swap);
let alice_handle = tokio::spawn(alice);
@ -23,7 +23,7 @@ async fn alice_punishes_if_bob_never_acts_after_fund() {
// Restart Bob after Alice punished to ensure Bob transitions to
// punished and does not run indefinitely
let bob_swap = ctx.recover_bob_from_db().await;
let bob_swap = ctx.stop_and_resume_bob_from_db(bob_join_handle).await;
assert!(matches!(bob_swap.state, BobState::BtcLocked {..}));
let bob_state = bob::run(bob_swap).await.unwrap();

View File

@ -1,28 +1,29 @@
pub mod testutils;
use swap::protocol::{alice, alice::AliceState, bob};
use testutils::alice_run_until::is_xmr_locked;
use testutils::{alice_run_until::is_xmr_locked, FastCancelConfig};
/// Bob locks btc and Alice locks xmr. Alice fails to act so Bob refunds. Alice
/// then also refunds.
#[tokio::test]
async fn given_alice_restarts_after_xmr_is_locked_refund_swap() {
testutils::setup_test(|mut ctx| async move {
let alice_swap = ctx.new_swap_as_alice().await;
let bob_swap = ctx.new_swap_as_bob().await;
testutils::setup_test(FastCancelConfig, |mut ctx| async move {
let (alice_swap, alice_join_handle) = ctx.new_swap_as_alice().await;
let (bob_swap, _) = ctx.new_swap_as_bob().await;
let bob = bob::run(bob_swap);
let bob_handle = tokio::spawn(bob);
let alice_state = alice::run_until(alice_swap, is_xmr_locked).await.unwrap();
assert!(matches!(alice_state, AliceState::XmrLocked {..}));
assert!(matches!(alice_state,
AliceState::XmrLocked {..}));
// Alice does not act, Bob refunds
let bob_state = bob_handle.await.unwrap();
ctx.assert_bob_refunded(bob_state.unwrap()).await;
// Once bob has finished Alice is restarted and refunds as well
let alice_swap = ctx.recover_alice_from_db().await;
let alice_swap = ctx.stop_and_resume_alice_from_db(alice_join_handle).await;
assert!(matches!(alice_swap.state, AliceState::XmrLocked {..}));
let alice_state = alice::run(alice_swap).await.unwrap();

View File

@ -1,6 +1,9 @@
pub mod testutils;
use swap::protocol::{alice, alice::AliceState, bob};
use swap::{
config,
protocol::{alice, alice::AliceState, bob},
};
use testutils::alice_run_until::is_encsig_learned;
/// Bob locks btc and Alice locks xmr. Alice fails to act so Bob refunds. Alice
@ -8,9 +11,9 @@ use testutils::alice_run_until::is_encsig_learned;
/// redeem had the timelock not expired.
#[tokio::test]
async fn given_alice_restarts_after_enc_sig_learned_and_bob_already_cancelled_refund_swap() {
testutils::setup_test(|mut ctx| async move {
let alice_swap = ctx.new_swap_as_alice().await;
let bob_swap = ctx.new_swap_as_bob().await;
testutils::setup_test(config::Regtest, |mut ctx| async move {
let (alice_swap, alice_join_handle) = ctx.new_swap_as_alice().await;
let (bob_swap, _) = ctx.new_swap_as_bob().await;
let bob = bob::run(bob_swap);
let bob_handle = tokio::spawn(bob);
@ -18,15 +21,24 @@ async fn given_alice_restarts_after_enc_sig_learned_and_bob_already_cancelled_re
let alice_state = alice::run_until(alice_swap, is_encsig_learned)
.await
.unwrap();
assert!(matches!(alice_state, AliceState::EncSigLearned {..}));
assert!(
matches!(alice_state, AliceState::EncSigLearned {..}),
"Alice state is not EncSigLearned: {:?}",
alice_state
);
// Wait for Bob to refund, because Alice does not act
let bob_state = bob_handle.await.unwrap();
ctx.assert_bob_refunded(bob_state.unwrap()).await;
// Once bob has finished Alice is restarted and refunds as well
let alice_swap = ctx.recover_alice_from_db().await;
assert!(matches!(alice_swap.state, AliceState::EncSigLearned {..}));
let alice_swap = ctx.stop_and_resume_alice_from_db(alice_join_handle).await;
assert!(
matches!(alice_swap.state, AliceState::EncSigLearned
{..}),
"Alice state is not EncSigLearned: {:?}",
alice_state
);
let alice_state = alice::run(alice_swap).await.unwrap();

View File

@ -7,13 +7,16 @@ use monero_harness::{image, Monero};
use std::{path::PathBuf, sync::Arc};
use swap::{
bitcoin,
config::Config,
bitcoin::Timelock,
config,
config::{Config, GetConfig},
monero,
protocol::{alice, alice::AliceState, bob, bob::BobState, SwapAmounts},
seed::Seed,
};
use tempfile::tempdir;
use testcontainers::{clients::Cli, Container};
use tokio::task::JoinHandle;
use tracing_core::dispatcher::DefaultGuard;
use tracing_log::LogTracer;
use uuid::Uuid;
@ -35,7 +38,7 @@ struct AliceParams {
}
impl AliceParams {
pub async fn builder(&self) -> alice::Builder {
pub fn builder(&self) -> alice::Builder {
alice::Builder::new(
self.seed,
self.config,
@ -45,14 +48,14 @@ impl AliceParams {
self.db_path.clone(),
self.listen_address.clone(),
)
.await
}
async fn peer_id(&self) -> PeerId {
self.builder().await.peer_id()
fn peer_id(&self) -> PeerId {
self.builder().peer_id()
}
}
#[derive(Debug, Clone)]
struct BobParams {
seed: Seed,
db_path: PathBuf,
@ -73,12 +76,16 @@ impl BobParams {
self.bitcoin_wallet.clone(),
self.monero_wallet.clone(),
self.alice_address.clone(),
self.alice_peer_id.clone(),
self.alice_peer_id,
self.config,
)
}
}
pub struct BobEventLoopJoinHandle(JoinHandle<()>);
pub struct AliceEventLoopJoinHandle(JoinHandle<()>);
pub struct TestContext {
swap_amounts: SwapAmounts,
@ -94,22 +101,21 @@ pub struct TestContext {
}
impl TestContext {
pub async fn new_swap_as_alice(&mut self) -> alice::Swap {
pub async fn new_swap_as_alice(&mut self) -> (alice::Swap, AliceEventLoopJoinHandle) {
let (swap, mut event_loop) = self
.alice_params
.builder()
.await
.with_init_params(self.swap_amounts)
.build()
.await
.unwrap();
tokio::spawn(async move { event_loop.run().await });
let join_handle = tokio::spawn(async move { event_loop.run().await });
swap
(swap, AliceEventLoopJoinHandle(join_handle))
}
pub async fn new_swap_as_bob(&mut self) -> bob::Swap {
pub async fn new_swap_as_bob(&mut self) -> (bob::Swap, BobEventLoopJoinHandle) {
let (swap, event_loop) = self
.bob_params
.builder()
@ -118,20 +124,30 @@ impl TestContext {
.await
.unwrap();
tokio::spawn(async move { event_loop.run().await });
let join_handle = tokio::spawn(async move { event_loop.run().await });
swap
(swap, BobEventLoopJoinHandle(join_handle))
}
pub async fn recover_alice_from_db(&mut self) -> alice::Swap {
let (swap, mut event_loop) = self.alice_params.builder().await.build().await.unwrap();
pub async fn stop_and_resume_alice_from_db(
&mut self,
join_handle: AliceEventLoopJoinHandle,
) -> alice::Swap {
join_handle.0.abort();
let (swap, mut event_loop) = self.alice_params.builder().build().await.unwrap();
tokio::spawn(async move { event_loop.run().await });
swap
}
pub async fn recover_bob_from_db(&mut self) -> bob::Swap {
pub async fn stop_and_resume_bob_from_db(
&mut self,
join_handle: BobEventLoopJoinHandle,
) -> bob::Swap {
join_handle.0.abort();
let (swap, event_loop) = self.bob_params.builder().build().await.unwrap();
tokio::spawn(async move { event_loop.run().await });
@ -203,7 +219,7 @@ impl TestContext {
let lock_tx_id = if let BobState::XmrRedeemed { tx_lock_id } = state {
tx_lock_id
} else {
panic!("Bob in unexpected state");
panic!("Bob in not in xmr redeemed state: {:?}", state);
};
let lock_tx_bitcoin_fee = self
@ -236,7 +252,7 @@ impl TestContext {
let lock_tx_id = if let BobState::BtcRefunded(state4) = state {
state4.tx_lock_id()
} else {
panic!("Bob in unexpected state");
panic!("Bob in not in btc refunded state: {:?}", state);
};
let lock_tx_bitcoin_fee = self
.bob_bitcoin_wallet
@ -268,7 +284,7 @@ impl TestContext {
let lock_tx_id = if let BobState::BtcPunished { tx_lock_id } = state {
tx_lock_id
} else {
panic!("Bob in unexpected state");
panic!("Bob in not in btc punished state: {:?}", state);
};
let lock_tx_bitcoin_fee = self
@ -288,15 +304,18 @@ impl TestContext {
}
}
pub async fn setup_test<T, F>(testfn: T)
pub async fn setup_test<T, F, C>(_config: C, testfn: T)
where
T: Fn(TestContext) -> F,
F: Future<Output = ()>,
C: GetConfig,
{
let cli = Cli::default();
let _guard = init_tracing();
let config = C::get_config();
let (monero, containers) = testutils::init_containers(&cli).await;
let swap_amounts = SwapAmounts {
@ -304,8 +323,6 @@ where
xmr: monero::Amount::from_piconero(1_000_000_000_000),
};
let config = Config::regtest();
let alice_starting_balances = StartingBalances {
xmr: swap_amounts.xmr * 10,
btc: bitcoin::Amount::ZERO,
@ -357,7 +374,7 @@ where
bitcoin_wallet: bob_bitcoin_wallet.clone(),
monero_wallet: bob_monero_wallet.clone(),
alice_address: alice_params.listen_address.clone(),
alice_peer_id: alice_params.peer_id().await,
alice_peer_id: alice_params.peer_id(),
config,
};
@ -497,3 +514,37 @@ pub mod bob_run_until {
matches!(state, BobState::EncSigSent(..))
}
}
pub struct SlowCancelConfig;
impl GetConfig for SlowCancelConfig {
fn get_config() -> Config {
Config {
bitcoin_cancel_timelock: Timelock::new(180),
..config::Regtest::get_config()
}
}
}
pub struct FastCancelConfig;
impl GetConfig for FastCancelConfig {
fn get_config() -> Config {
Config {
bitcoin_cancel_timelock: Timelock::new(1),
..config::Regtest::get_config()
}
}
}
pub struct FastPunishConfig;
impl GetConfig for FastPunishConfig {
fn get_config() -> Config {
Config {
bitcoin_cancel_timelock: Timelock::new(1),
bitcoin_punish_timelock: Timelock::new(1),
..config::Regtest::get_config()
}
}
}