191: Use Remote handle to access ongoing swaps on Alice r=rishflab a=D4nte

This helps stabilizing the test as sometimes we were checking Alice state (in the DB) too early.

Co-authored-by: Franck Royer <franck@coblox.tech>
This commit is contained in:
bors[bot] 2021-02-15 04:18:24 +00:00 committed by GitHub
commit 6e6c207715
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 40 additions and 51 deletions

View File

@ -82,7 +82,7 @@ async fn main() -> Result<()> {
let (bitcoin_wallet, monero_wallet) = init_wallets(config.clone()).await?;
let mut event_loop = EventLoop::new(
let (mut event_loop, _) = EventLoop::new(
config.network.listen,
seed,
execution_params,

View File

@ -6,20 +6,23 @@ use crate::{
network::{transport, TokioExecutor},
protocol::{
alice,
alice::{Behaviour, Builder, OutEvent, QuoteResponse, State0, State3, TransferProof},
alice::{
AliceState, Behaviour, Builder, OutEvent, QuoteResponse, State0, State3, TransferProof,
},
bob::{EncryptedSignature, QuoteRequest},
SwapAmounts,
},
seed::Seed,
};
use anyhow::{anyhow, Context, Result};
use futures::future::RemoteHandle;
use libp2p::{
core::Multiaddr, futures::FutureExt, request_response::ResponseChannel, PeerId, Swarm,
};
use rand::rngs::OsRng;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::{broadcast, mpsc};
use tracing::{debug, error, trace};
use tracing::{debug, error, trace, warn};
use uuid::Uuid;
// TODO: Use dynamic
@ -95,6 +98,8 @@ pub struct EventLoop {
// Only used to produce new handles
send_transfer_proof_sender: mpsc::Sender<(PeerId, TransferProof)>,
swap_handle_sender: mpsc::Sender<RemoteHandle<Result<AliceState>>>,
}
impl EventLoop {
@ -105,7 +110,7 @@ impl EventLoop {
bitcoin_wallet: Arc<bitcoin::Wallet>,
monero_wallet: Arc<monero::Wallet>,
db: Arc<Database>,
) -> Result<Self> {
) -> Result<(Self, mpsc::Receiver<RemoteHandle<Result<AliceState>>>)> {
let identity = network::Seed::new(seed).derive_libp2p_identity();
let behaviour = Behaviour::default();
let transport = transport::build(&identity)?;
@ -122,8 +127,9 @@ impl EventLoop {
let recv_encrypted_signature = BroadcastChannels::default();
let send_transfer_proof = MpscChannels::default();
let swap_handle = MpscChannels::default();
Ok(EventLoop {
let event_loop = EventLoop {
swarm,
peer_id,
execution_params,
@ -135,7 +141,9 @@ impl EventLoop {
recv_encrypted_signature: recv_encrypted_signature.sender,
send_transfer_proof: send_transfer_proof.receiver,
send_transfer_proof_sender: send_transfer_proof.sender,
})
swap_handle_sender: swap_handle.sender,
};
Ok((event_loop, swap_handle.receiver))
}
pub fn new_handle(&self) -> EventLoopHandle {
@ -257,7 +265,14 @@ impl EventLoop {
.build()
.await?;
tokio::spawn(async move { alice::run(swap).await });
let (remote, remote_handle) = alice::run(swap).remote_handle();
tokio::spawn(async move { remote.await });
let _ = self
.swap_handle_sender
.send(remote_handle)
.await
.map_err(|err| warn!("Could not send swap handle over channel: {:?}", err));
Ok(())
}

View File

@ -1,6 +1,6 @@
use crate::testutils;
use bitcoin_harness::Bitcoind;
use futures::Future;
use futures::{future::RemoteHandle, Future};
use get_port::get_port;
use libp2p::{core::Multiaddr, PeerId};
use monero_harness::{image, Monero};
@ -8,7 +8,7 @@ use std::{path::PathBuf, sync::Arc};
use swap::{
bitcoin,
bitcoin::Timelock,
database::{Database, Swap},
database::Database,
execution_params,
execution_params::{ExecutionParams, GetExecutionParams},
monero,
@ -17,7 +17,7 @@ use swap::{
};
use tempfile::tempdir;
use testcontainers::{clients::Cli, Container};
use tokio::task::JoinHandle;
use tokio::{sync::mpsc, task::JoinHandle};
use tracing_core::dispatcher::DefaultGuard;
use tracing_log::LogTracer;
use uuid::Uuid;
@ -28,13 +28,6 @@ pub struct StartingBalances {
pub btc: bitcoin::Amount,
}
struct AliceParams {
seed: Seed,
execution_params: ExecutionParams,
db: Arc<Database>,
listen_address: Multiaddr,
}
#[derive(Debug, Clone)]
struct BobParams {
seed: Seed,
@ -75,10 +68,10 @@ pub struct AliceEventLoopJoinHandle(JoinHandle<()>);
pub struct TestContext {
swap_amounts: SwapAmounts,
alice_params: AliceParams,
alice_starting_balances: StartingBalances,
alice_bitcoin_wallet: Arc<bitcoin::Wallet>,
alice_monero_wallet: Arc<monero::Wallet>,
alice_swap_handle: mpsc::Receiver<RemoteHandle<anyhow::Result<AliceState>>>,
bob_params: BobParams,
bob_starting_balances: StartingBalances,
@ -114,16 +107,9 @@ impl TestContext {
(swap, BobEventLoopJoinHandle(join_handle))
}
pub async fn assert_alice_redeemed(&self) {
let mut states = self.alice_params.db.all().unwrap();
assert_eq!(states.len(), 1, "Expected only one swap in Alice's db");
let (_swap_id, state) = states.pop().unwrap();
let state = match state {
Swap::Alice(state) => state.into(),
Swap::Bob(_) => panic!("Bob state in Alice db is unexpected"),
};
pub async fn assert_alice_redeemed(&mut self) {
let swap_handle = self.alice_swap_handle.recv().await.unwrap();
let state = swap_handle.await.unwrap();
assert!(matches!(state, AliceState::BtcRedeemed));
@ -143,16 +129,9 @@ impl TestContext {
assert!(xmr_balance_after_swap <= self.alice_starting_balances.xmr - self.swap_amounts.xmr);
}
pub async fn assert_alice_refunded(&self) {
let mut states = self.alice_params.db.all().unwrap();
assert_eq!(states.len(), 1, "Expected only one swap in Alice's db");
let (_swap_id, state) = states.pop().unwrap();
let state = match state {
Swap::Alice(state) => state.into(),
Swap::Bob(_) => panic!("Bob state in Alice db is unexpected"),
};
pub async fn assert_alice_refunded(&mut self) {
let swap_handle = self.alice_swap_handle.recv().await.unwrap();
let state = swap_handle.await.unwrap();
assert!(
matches!(state, AliceState::XmrRefunded),
@ -313,7 +292,7 @@ where
let port = get_port().expect("Failed to find a free port");
let listen_address: Multiaddr = format!("/ip4/127.0.0.1/tcp/{}", port)
let alice_listen_address: Multiaddr = format!("/ip4/127.0.0.1/tcp/{}", port)
.parse()
.expect("failed to parse Alice's address");
@ -328,12 +307,7 @@ where
let db_path = tempdir().unwrap();
let alice_db = Arc::new(Database::open(db_path.path()).unwrap());
let alice_params = AliceParams {
seed: Seed::random().unwrap(),
execution_params,
db: alice_db.clone(),
listen_address,
};
let alice_seed = Seed::random().unwrap();
let bob_starting_balances = StartingBalances {
xmr: monero::Amount::ZERO,
@ -348,10 +322,10 @@ where
)
.await;
let mut alice_event_loop = alice::EventLoop::new(
alice_params.listen_address.clone(),
alice_params.seed,
alice_params.execution_params,
let (mut alice_event_loop, alice_swap_handle) = alice::EventLoop::new(
alice_listen_address.clone(),
alice_seed,
execution_params,
alice_bitcoin_wallet.clone(),
alice_monero_wallet.clone(),
alice_db,
@ -370,17 +344,17 @@ where
swap_id: Uuid::new_v4(),
bitcoin_wallet: bob_bitcoin_wallet.clone(),
monero_wallet: bob_monero_wallet.clone(),
alice_address: alice_params.listen_address.clone(),
alice_address: alice_listen_address,
alice_peer_id,
execution_params,
};
let test = TestContext {
swap_amounts,
alice_params,
alice_starting_balances,
alice_bitcoin_wallet,
alice_monero_wallet,
alice_swap_handle,
bob_params,
bob_starting_balances,
bob_bitcoin_wallet,