Use Remote handle to access ongoing swaps on Alice

This commit is contained in:
Franck Royer 2021-02-15 15:00:47 +11:00
parent a0753e24dc
commit 65e0e5b731
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
3 changed files with 40 additions and 51 deletions

View File

@ -82,7 +82,7 @@ async fn main() -> Result<()> {
let (bitcoin_wallet, monero_wallet) = init_wallets(config.clone()).await?; let (bitcoin_wallet, monero_wallet) = init_wallets(config.clone()).await?;
let mut event_loop = EventLoop::new( let (mut event_loop, _) = EventLoop::new(
config.network.listen, config.network.listen,
seed, seed,
execution_params, execution_params,

View File

@ -6,20 +6,23 @@ use crate::{
network::{transport, TokioExecutor}, network::{transport, TokioExecutor},
protocol::{ protocol::{
alice, alice,
alice::{Behaviour, Builder, OutEvent, QuoteResponse, State0, State3, TransferProof}, alice::{
AliceState, Behaviour, Builder, OutEvent, QuoteResponse, State0, State3, TransferProof,
},
bob::{EncryptedSignature, QuoteRequest}, bob::{EncryptedSignature, QuoteRequest},
SwapAmounts, SwapAmounts,
}, },
seed::Seed, seed::Seed,
}; };
use anyhow::{anyhow, Context, Result}; use anyhow::{anyhow, Context, Result};
use futures::future::RemoteHandle;
use libp2p::{ use libp2p::{
core::Multiaddr, futures::FutureExt, request_response::ResponseChannel, PeerId, Swarm, core::Multiaddr, futures::FutureExt, request_response::ResponseChannel, PeerId, Swarm,
}; };
use rand::rngs::OsRng; use rand::rngs::OsRng;
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, sync::Arc};
use tokio::sync::{broadcast, mpsc}; use tokio::sync::{broadcast, mpsc};
use tracing::{debug, error, trace}; use tracing::{debug, error, trace, warn};
use uuid::Uuid; use uuid::Uuid;
// TODO: Use dynamic // TODO: Use dynamic
@ -95,6 +98,8 @@ pub struct EventLoop {
// Only used to produce new handles // Only used to produce new handles
send_transfer_proof_sender: mpsc::Sender<(PeerId, TransferProof)>, send_transfer_proof_sender: mpsc::Sender<(PeerId, TransferProof)>,
swap_handle_sender: mpsc::Sender<RemoteHandle<Result<AliceState>>>,
} }
impl EventLoop { impl EventLoop {
@ -105,7 +110,7 @@ impl EventLoop {
bitcoin_wallet: Arc<bitcoin::Wallet>, bitcoin_wallet: Arc<bitcoin::Wallet>,
monero_wallet: Arc<monero::Wallet>, monero_wallet: Arc<monero::Wallet>,
db: Arc<Database>, db: Arc<Database>,
) -> Result<Self> { ) -> Result<(Self, mpsc::Receiver<RemoteHandle<Result<AliceState>>>)> {
let identity = network::Seed::new(seed).derive_libp2p_identity(); let identity = network::Seed::new(seed).derive_libp2p_identity();
let behaviour = Behaviour::default(); let behaviour = Behaviour::default();
let transport = transport::build(&identity)?; let transport = transport::build(&identity)?;
@ -122,8 +127,9 @@ impl EventLoop {
let recv_encrypted_signature = BroadcastChannels::default(); let recv_encrypted_signature = BroadcastChannels::default();
let send_transfer_proof = MpscChannels::default(); let send_transfer_proof = MpscChannels::default();
let swap_handle = MpscChannels::default();
Ok(EventLoop { let event_loop = EventLoop {
swarm, swarm,
peer_id, peer_id,
execution_params, execution_params,
@ -135,7 +141,9 @@ impl EventLoop {
recv_encrypted_signature: recv_encrypted_signature.sender, recv_encrypted_signature: recv_encrypted_signature.sender,
send_transfer_proof: send_transfer_proof.receiver, send_transfer_proof: send_transfer_proof.receiver,
send_transfer_proof_sender: send_transfer_proof.sender, send_transfer_proof_sender: send_transfer_proof.sender,
}) swap_handle_sender: swap_handle.sender,
};
Ok((event_loop, swap_handle.receiver))
} }
pub fn new_handle(&self) -> EventLoopHandle { pub fn new_handle(&self) -> EventLoopHandle {
@ -257,7 +265,14 @@ impl EventLoop {
.build() .build()
.await?; .await?;
tokio::spawn(async move { alice::run(swap).await }); let (remote, remote_handle) = alice::run(swap).remote_handle();
tokio::spawn(async move { remote.await });
let _ = self
.swap_handle_sender
.send(remote_handle)
.await
.map_err(|err| warn!("Could not send swap handle over channel: {:?}", err));
Ok(()) Ok(())
} }

View File

@ -1,6 +1,6 @@
use crate::testutils; use crate::testutils;
use bitcoin_harness::Bitcoind; use bitcoin_harness::Bitcoind;
use futures::Future; use futures::{future::RemoteHandle, Future};
use get_port::get_port; use get_port::get_port;
use libp2p::{core::Multiaddr, PeerId}; use libp2p::{core::Multiaddr, PeerId};
use monero_harness::{image, Monero}; use monero_harness::{image, Monero};
@ -8,7 +8,7 @@ use std::{path::PathBuf, sync::Arc};
use swap::{ use swap::{
bitcoin, bitcoin,
bitcoin::Timelock, bitcoin::Timelock,
database::{Database, Swap}, database::Database,
execution_params, execution_params,
execution_params::{ExecutionParams, GetExecutionParams}, execution_params::{ExecutionParams, GetExecutionParams},
monero, monero,
@ -17,7 +17,7 @@ use swap::{
}; };
use tempfile::tempdir; use tempfile::tempdir;
use testcontainers::{clients::Cli, Container}; use testcontainers::{clients::Cli, Container};
use tokio::task::JoinHandle; use tokio::{sync::mpsc, task::JoinHandle};
use tracing_core::dispatcher::DefaultGuard; use tracing_core::dispatcher::DefaultGuard;
use tracing_log::LogTracer; use tracing_log::LogTracer;
use uuid::Uuid; use uuid::Uuid;
@ -28,13 +28,6 @@ pub struct StartingBalances {
pub btc: bitcoin::Amount, pub btc: bitcoin::Amount,
} }
struct AliceParams {
seed: Seed,
execution_params: ExecutionParams,
db: Arc<Database>,
listen_address: Multiaddr,
}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct BobParams { struct BobParams {
seed: Seed, seed: Seed,
@ -75,10 +68,10 @@ pub struct AliceEventLoopJoinHandle(JoinHandle<()>);
pub struct TestContext { pub struct TestContext {
swap_amounts: SwapAmounts, swap_amounts: SwapAmounts,
alice_params: AliceParams,
alice_starting_balances: StartingBalances, alice_starting_balances: StartingBalances,
alice_bitcoin_wallet: Arc<bitcoin::Wallet>, alice_bitcoin_wallet: Arc<bitcoin::Wallet>,
alice_monero_wallet: Arc<monero::Wallet>, alice_monero_wallet: Arc<monero::Wallet>,
alice_swap_handle: mpsc::Receiver<RemoteHandle<anyhow::Result<AliceState>>>,
bob_params: BobParams, bob_params: BobParams,
bob_starting_balances: StartingBalances, bob_starting_balances: StartingBalances,
@ -114,16 +107,9 @@ impl TestContext {
(swap, BobEventLoopJoinHandle(join_handle)) (swap, BobEventLoopJoinHandle(join_handle))
} }
pub async fn assert_alice_redeemed(&self) { pub async fn assert_alice_redeemed(&mut self) {
let mut states = self.alice_params.db.all().unwrap(); let swap_handle = self.alice_swap_handle.recv().await.unwrap();
let state = swap_handle.await.unwrap();
assert_eq!(states.len(), 1, "Expected only one swap in Alice's db");
let (_swap_id, state) = states.pop().unwrap();
let state = match state {
Swap::Alice(state) => state.into(),
Swap::Bob(_) => panic!("Bob state in Alice db is unexpected"),
};
assert!(matches!(state, AliceState::BtcRedeemed)); assert!(matches!(state, AliceState::BtcRedeemed));
@ -143,16 +129,9 @@ impl TestContext {
assert!(xmr_balance_after_swap <= self.alice_starting_balances.xmr - self.swap_amounts.xmr); assert!(xmr_balance_after_swap <= self.alice_starting_balances.xmr - self.swap_amounts.xmr);
} }
pub async fn assert_alice_refunded(&self) { pub async fn assert_alice_refunded(&mut self) {
let mut states = self.alice_params.db.all().unwrap(); let swap_handle = self.alice_swap_handle.recv().await.unwrap();
let state = swap_handle.await.unwrap();
assert_eq!(states.len(), 1, "Expected only one swap in Alice's db");
let (_swap_id, state) = states.pop().unwrap();
let state = match state {
Swap::Alice(state) => state.into(),
Swap::Bob(_) => panic!("Bob state in Alice db is unexpected"),
};
assert!( assert!(
matches!(state, AliceState::XmrRefunded), matches!(state, AliceState::XmrRefunded),
@ -313,7 +292,7 @@ where
let port = get_port().expect("Failed to find a free port"); let port = get_port().expect("Failed to find a free port");
let listen_address: Multiaddr = format!("/ip4/127.0.0.1/tcp/{}", port) let alice_listen_address: Multiaddr = format!("/ip4/127.0.0.1/tcp/{}", port)
.parse() .parse()
.expect("failed to parse Alice's address"); .expect("failed to parse Alice's address");
@ -328,12 +307,7 @@ where
let db_path = tempdir().unwrap(); let db_path = tempdir().unwrap();
let alice_db = Arc::new(Database::open(db_path.path()).unwrap()); let alice_db = Arc::new(Database::open(db_path.path()).unwrap());
let alice_params = AliceParams { let alice_seed = Seed::random().unwrap();
seed: Seed::random().unwrap(),
execution_params,
db: alice_db.clone(),
listen_address,
};
let bob_starting_balances = StartingBalances { let bob_starting_balances = StartingBalances {
xmr: monero::Amount::ZERO, xmr: monero::Amount::ZERO,
@ -348,10 +322,10 @@ where
) )
.await; .await;
let mut alice_event_loop = alice::EventLoop::new( let (mut alice_event_loop, alice_swap_handle) = alice::EventLoop::new(
alice_params.listen_address.clone(), alice_listen_address.clone(),
alice_params.seed, alice_seed,
alice_params.execution_params, execution_params,
alice_bitcoin_wallet.clone(), alice_bitcoin_wallet.clone(),
alice_monero_wallet.clone(), alice_monero_wallet.clone(),
alice_db, alice_db,
@ -370,17 +344,17 @@ where
swap_id: Uuid::new_v4(), swap_id: Uuid::new_v4(),
bitcoin_wallet: bob_bitcoin_wallet.clone(), bitcoin_wallet: bob_bitcoin_wallet.clone(),
monero_wallet: bob_monero_wallet.clone(), monero_wallet: bob_monero_wallet.clone(),
alice_address: alice_params.listen_address.clone(), alice_address: alice_listen_address,
alice_peer_id, alice_peer_id,
execution_params, execution_params,
}; };
let test = TestContext { let test = TestContext {
swap_amounts, swap_amounts,
alice_params,
alice_starting_balances, alice_starting_balances,
alice_bitcoin_wallet, alice_bitcoin_wallet,
alice_monero_wallet, alice_monero_wallet,
alice_swap_handle,
bob_params, bob_params,
bob_starting_balances, bob_starting_balances,
bob_bitcoin_wallet, bob_bitcoin_wallet,