mirror of
https://github.com/comit-network/xmr-btc-swap.git
synced 2024-10-01 01:45:40 -04:00
Alice event loop now handles the creation of new swaps
This commit is contained in:
parent
15eb9a2fe4
commit
6e6dc320b4
@ -4,4 +4,7 @@ status = [
|
||||
"build_test (x86_64-apple-darwin)",
|
||||
"docker_tests (happy_path)",
|
||||
"docker_tests (happy_path_restart_bob_before_comm)",
|
||||
"docker_tests (bob_refunds_using_cancel_and_refund_command.rs)",
|
||||
"docker_tests (bob_refunds_using_cancel_and_refund_command_timelock_not_exired_force.rs)",
|
||||
"docker_tests (bob_refunds_using_cancel_and_refund_command_timelock_not_exired.rs)"
|
||||
]
|
||||
|
@ -14,10 +14,6 @@ use serde::{Deserialize, Serialize};
|
||||
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
|
||||
pub enum Alice {
|
||||
Started {
|
||||
amounts: SwapAmounts,
|
||||
state0: alice::State0,
|
||||
},
|
||||
Negotiated {
|
||||
state3: alice::State3,
|
||||
#[serde(with = "crate::serde_peer_id")]
|
||||
bob_peer_id: PeerId,
|
||||
@ -54,11 +50,11 @@ pub enum AliceEndState {
|
||||
impl From<&AliceState> for Alice {
|
||||
fn from(alice_state: &AliceState) -> Self {
|
||||
match alice_state {
|
||||
AliceState::Negotiated {
|
||||
AliceState::Started {
|
||||
state3,
|
||||
bob_peer_id,
|
||||
..
|
||||
} => Alice::Negotiated {
|
||||
} => Alice::Started {
|
||||
state3: state3.as_ref().clone(),
|
||||
bob_peer_id: *bob_peer_id,
|
||||
},
|
||||
@ -93,10 +89,6 @@ impl From<&AliceState> for Alice {
|
||||
}
|
||||
AliceState::BtcPunished => Alice::Done(AliceEndState::BtcPunished),
|
||||
AliceState::SafelyAborted => Alice::Done(AliceEndState::SafelyAborted),
|
||||
AliceState::Started { amounts, state0 } => Alice::Started {
|
||||
amounts: *amounts,
|
||||
state0: state0.clone(),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -104,11 +96,10 @@ impl From<&AliceState> for Alice {
|
||||
impl From<Alice> for AliceState {
|
||||
fn from(db_state: Alice) -> Self {
|
||||
match db_state {
|
||||
Alice::Started { amounts, state0 } => AliceState::Started { amounts, state0 },
|
||||
Alice::Negotiated {
|
||||
Alice::Started {
|
||||
state3,
|
||||
bob_peer_id,
|
||||
} => AliceState::Negotiated {
|
||||
} => AliceState::Started {
|
||||
bob_peer_id,
|
||||
amounts: SwapAmounts {
|
||||
btc: state3.btc,
|
||||
@ -186,7 +177,6 @@ impl Display for Alice {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Alice::Started { .. } => write!(f, "Started"),
|
||||
Alice::Negotiated { .. } => f.write_str("Negotiated"),
|
||||
Alice::BtcLocked { .. } => f.write_str("Bitcoin locked"),
|
||||
Alice::XmrLocked(_) => f.write_str("Monero locked"),
|
||||
Alice::CancelTimelockExpired(_) => f.write_str("Cancel timelock is expired"),
|
||||
|
@ -18,10 +18,10 @@ use libp2p::{
|
||||
/// - DNS name resolution
|
||||
/// - authentication via noise
|
||||
/// - multiplexing via yamux or mplex
|
||||
pub fn build(id_keys: identity::Keypair) -> Result<SwapTransport> {
|
||||
pub fn build(id_keys: &identity::Keypair) -> Result<SwapTransport> {
|
||||
use libp2p::tcp::TokioTcpConfig;
|
||||
|
||||
let dh_keys = noise::Keypair::<X25519Spec>::new().into_authentic(&id_keys)?;
|
||||
let dh_keys = noise::Keypair::<X25519Spec>::new().into_authentic(id_keys)?;
|
||||
let noise = NoiseConfig::xx(dh_keys).into_authenticated();
|
||||
|
||||
let tcp = TokioTcpConfig::new().nodelay(true);
|
||||
|
@ -2,15 +2,15 @@
|
||||
//! Alice holds XMR and wishes receive BTC.
|
||||
use crate::{
|
||||
bitcoin, database, database::Database, execution_params::ExecutionParams, monero,
|
||||
network::Seed as NetworkSeed, protocol::SwapAmounts, seed::Seed,
|
||||
protocol::SwapAmounts,
|
||||
};
|
||||
use anyhow::{bail, Result};
|
||||
use libp2p::{core::Multiaddr, identity::Keypair, PeerId};
|
||||
use rand::rngs::OsRng;
|
||||
use libp2p::{core::Multiaddr, PeerId};
|
||||
use std::sync::Arc;
|
||||
use uuid::Uuid;
|
||||
|
||||
pub use self::{
|
||||
behaviour::{Behaviour, OutEvent},
|
||||
event_loop::{EventLoop, EventLoopHandle},
|
||||
execution_setup::Message1,
|
||||
state::*,
|
||||
@ -37,16 +37,15 @@ pub struct Swap {
|
||||
pub monero_wallet: Arc<monero::Wallet>,
|
||||
pub execution_params: ExecutionParams,
|
||||
pub swap_id: Uuid,
|
||||
pub db: Database,
|
||||
pub db: Arc<Database>,
|
||||
}
|
||||
|
||||
pub struct Builder {
|
||||
swap_id: Uuid,
|
||||
identity: Keypair,
|
||||
peer_id: PeerId,
|
||||
db: Database,
|
||||
db: Arc<Database>,
|
||||
execution_params: ExecutionParams,
|
||||
|
||||
event_loop_handle: EventLoopHandle,
|
||||
listen_address: Multiaddr,
|
||||
|
||||
bitcoin_wallet: Arc<bitcoin::Wallet>,
|
||||
@ -57,29 +56,31 @@ pub struct Builder {
|
||||
|
||||
enum InitParams {
|
||||
None,
|
||||
New { swap_amounts: SwapAmounts },
|
||||
New {
|
||||
swap_amounts: SwapAmounts,
|
||||
bob_peer_id: PeerId,
|
||||
state3: Box<State3>,
|
||||
},
|
||||
}
|
||||
|
||||
impl Builder {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
seed: Seed,
|
||||
self_peer_id: PeerId,
|
||||
execution_params: ExecutionParams,
|
||||
swap_id: Uuid,
|
||||
bitcoin_wallet: Arc<bitcoin::Wallet>,
|
||||
monero_wallet: Arc<monero::Wallet>,
|
||||
db: Database,
|
||||
db: Arc<Database>,
|
||||
listen_address: Multiaddr,
|
||||
event_loop_handle: EventLoopHandle,
|
||||
) -> Self {
|
||||
let network_seed = NetworkSeed::new(seed);
|
||||
let identity = network_seed.derive_libp2p_identity();
|
||||
let peer_id = PeerId::from(identity.public());
|
||||
|
||||
Self {
|
||||
swap_id,
|
||||
identity,
|
||||
peer_id,
|
||||
peer_id: self_peer_id,
|
||||
db,
|
||||
execution_params,
|
||||
event_loop_handle,
|
||||
listen_address,
|
||||
bitcoin_wallet,
|
||||
monero_wallet,
|
||||
@ -87,35 +88,44 @@ impl Builder {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_init_params(self, swap_amounts: SwapAmounts) -> Self {
|
||||
pub fn with_init_params(
|
||||
self,
|
||||
swap_amounts: SwapAmounts,
|
||||
bob_peer_id: PeerId,
|
||||
state3: State3,
|
||||
) -> Self {
|
||||
Self {
|
||||
init_params: InitParams::New { swap_amounts },
|
||||
init_params: InitParams::New {
|
||||
swap_amounts,
|
||||
bob_peer_id,
|
||||
state3: Box::new(state3),
|
||||
},
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn build(self) -> Result<(Swap, EventLoop)> {
|
||||
pub async fn build(self) -> Result<Swap> {
|
||||
match self.init_params {
|
||||
InitParams::New { swap_amounts } => {
|
||||
let initial_state = self
|
||||
.make_initial_state(swap_amounts.btc, swap_amounts.xmr)
|
||||
.await?;
|
||||
InitParams::New {
|
||||
swap_amounts,
|
||||
bob_peer_id,
|
||||
ref state3,
|
||||
} => {
|
||||
let initial_state = AliceState::Started {
|
||||
amounts: swap_amounts,
|
||||
state3: state3.clone(),
|
||||
bob_peer_id,
|
||||
};
|
||||
|
||||
let (event_loop, event_loop_handle) =
|
||||
EventLoop::new(self.identity.clone(), self.listen_address(), self.peer_id)?;
|
||||
|
||||
Ok((
|
||||
Swap {
|
||||
event_loop_handle,
|
||||
bitcoin_wallet: self.bitcoin_wallet,
|
||||
monero_wallet: self.monero_wallet,
|
||||
execution_params: self.execution_params,
|
||||
db: self.db,
|
||||
state: initial_state,
|
||||
swap_id: self.swap_id,
|
||||
},
|
||||
event_loop,
|
||||
))
|
||||
Ok(Swap {
|
||||
event_loop_handle: self.event_loop_handle,
|
||||
bitcoin_wallet: self.bitcoin_wallet,
|
||||
monero_wallet: self.monero_wallet,
|
||||
execution_params: self.execution_params,
|
||||
db: self.db,
|
||||
state: initial_state,
|
||||
swap_id: self.swap_id,
|
||||
})
|
||||
}
|
||||
InitParams::None => {
|
||||
let resume_state =
|
||||
@ -128,21 +138,15 @@ impl Builder {
|
||||
)
|
||||
};
|
||||
|
||||
let (event_loop, event_loop_handle) =
|
||||
EventLoop::new(self.identity.clone(), self.listen_address(), self.peer_id)?;
|
||||
|
||||
Ok((
|
||||
Swap {
|
||||
state: resume_state,
|
||||
event_loop_handle,
|
||||
bitcoin_wallet: self.bitcoin_wallet,
|
||||
monero_wallet: self.monero_wallet,
|
||||
execution_params: self.execution_params,
|
||||
swap_id: self.swap_id,
|
||||
db: self.db,
|
||||
},
|
||||
event_loop,
|
||||
))
|
||||
Ok(Swap {
|
||||
state: resume_state,
|
||||
event_loop_handle: self.event_loop_handle,
|
||||
bitcoin_wallet: self.bitcoin_wallet,
|
||||
monero_wallet: self.monero_wallet,
|
||||
execution_params: self.execution_params,
|
||||
swap_id: self.swap_id,
|
||||
db: self.db,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -154,26 +158,4 @@ impl Builder {
|
||||
pub fn listen_address(&self) -> Multiaddr {
|
||||
self.listen_address.clone()
|
||||
}
|
||||
|
||||
async fn make_initial_state(
|
||||
&self,
|
||||
btc_to_swap: bitcoin::Amount,
|
||||
xmr_to_swap: monero::Amount,
|
||||
) -> Result<AliceState> {
|
||||
let amounts = SwapAmounts {
|
||||
btc: btc_to_swap,
|
||||
xmr: xmr_to_swap,
|
||||
};
|
||||
|
||||
let state0 = State0::new(
|
||||
amounts.btc,
|
||||
amounts.xmr,
|
||||
self.execution_params,
|
||||
self.bitcoin_wallet.as_ref(),
|
||||
&mut OsRng,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(AliceState::Started { amounts, state0 })
|
||||
}
|
||||
}
|
||||
|
@ -1,7 +1,6 @@
|
||||
use crate::{
|
||||
network::{peer_tracker, peer_tracker::PeerTracker},
|
||||
protocol::{
|
||||
alice,
|
||||
alice::{
|
||||
encrypted_signature, execution_setup, swap_response, transfer_proof, State0, State3,
|
||||
SwapResponse, TransferProof,
|
||||
@ -19,8 +18,12 @@ pub enum OutEvent {
|
||||
SwapRequest {
|
||||
msg: SwapRequest,
|
||||
channel: ResponseChannel<SwapResponse>,
|
||||
bob_peer_id: PeerId,
|
||||
},
|
||||
ExecutionSetupDone {
|
||||
bob_peer_id: PeerId,
|
||||
state3: Box<State3>,
|
||||
},
|
||||
ExecutionSetupDone(Box<State3>),
|
||||
TransferProofAcknowledged,
|
||||
EncryptedSignature {
|
||||
msg: Box<EncryptedSignature>,
|
||||
@ -40,11 +43,19 @@ impl From<peer_tracker::OutEvent> for OutEvent {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<alice::OutEvent> for OutEvent {
|
||||
fn from(event: alice::OutEvent) -> Self {
|
||||
use crate::protocol::alice::OutEvent::*;
|
||||
impl From<swap_response::OutEvent> for OutEvent {
|
||||
fn from(event: swap_response::OutEvent) -> Self {
|
||||
use crate::protocol::alice::swap_response::OutEvent::*;
|
||||
match event {
|
||||
MsgReceived { msg, channel } => OutEvent::SwapRequest { msg, channel },
|
||||
MsgReceived {
|
||||
msg,
|
||||
channel,
|
||||
bob_peer_id,
|
||||
} => OutEvent::SwapRequest {
|
||||
msg,
|
||||
channel,
|
||||
bob_peer_id,
|
||||
},
|
||||
ResponseSent => OutEvent::ResponseSent,
|
||||
Failure(err) => OutEvent::Failure(err.context("Swap Request/Response failure")),
|
||||
}
|
||||
@ -55,7 +66,13 @@ impl From<execution_setup::OutEvent> for OutEvent {
|
||||
fn from(event: execution_setup::OutEvent) -> Self {
|
||||
use crate::protocol::alice::execution_setup::OutEvent::*;
|
||||
match event {
|
||||
Done(state3) => OutEvent::ExecutionSetupDone(Box::new(state3)),
|
||||
Done {
|
||||
bob_peer_id,
|
||||
state3,
|
||||
} => OutEvent::ExecutionSetupDone {
|
||||
bob_peer_id,
|
||||
state3: Box::new(state3),
|
||||
},
|
||||
Failure(err) => OutEvent::Failure(err),
|
||||
}
|
||||
}
|
||||
|
@ -1,19 +1,29 @@
|
||||
use crate::{
|
||||
bitcoin,
|
||||
database::Database,
|
||||
execution_params::ExecutionParams,
|
||||
monero, network,
|
||||
network::{transport, TokioExecutor},
|
||||
protocol::{
|
||||
alice::{
|
||||
behaviour::{Behaviour, OutEvent},
|
||||
State3, SwapResponse, TransferProof,
|
||||
},
|
||||
alice,
|
||||
alice::{Behaviour, Builder, OutEvent, State0, State3, SwapResponse, TransferProof},
|
||||
bob::{EncryptedSignature, SwapRequest},
|
||||
SwapAmounts,
|
||||
},
|
||||
seed::Seed,
|
||||
};
|
||||
use anyhow::{Context, Result};
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use libp2p::{
|
||||
core::Multiaddr, futures::FutureExt, request_response::ResponseChannel, PeerId, Swarm,
|
||||
};
|
||||
use rand::rngs::OsRng;
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use tokio::sync::{broadcast, mpsc};
|
||||
use tracing::{debug, error, trace};
|
||||
use uuid::Uuid;
|
||||
|
||||
// TODO: Use dynamic
|
||||
const RATE: u32 = 100;
|
||||
|
||||
#[allow(missing_debug_implementations)]
|
||||
pub struct MpscChannels<T> {
|
||||
@ -34,7 +44,6 @@ where
|
||||
T: Clone,
|
||||
{
|
||||
sender: broadcast::Sender<T>,
|
||||
receiver: broadcast::Receiver<T>,
|
||||
}
|
||||
|
||||
impl<T> Default for BroadcastChannels<T>
|
||||
@ -42,8 +51,8 @@ where
|
||||
T: Clone,
|
||||
{
|
||||
fn default() -> Self {
|
||||
let (sender, receiver) = broadcast::channel(100);
|
||||
BroadcastChannels { sender, receiver }
|
||||
let (sender, _receiver) = broadcast::channel(100);
|
||||
BroadcastChannels { sender }
|
||||
}
|
||||
}
|
||||
|
||||
@ -70,21 +79,37 @@ impl EventLoopHandle {
|
||||
#[allow(missing_debug_implementations)]
|
||||
pub struct EventLoop {
|
||||
swarm: libp2p::Swarm<Behaviour>,
|
||||
peer_id: PeerId,
|
||||
execution_params: ExecutionParams,
|
||||
bitcoin_wallet: Arc<bitcoin::Wallet>,
|
||||
monero_wallet: Arc<monero::Wallet>,
|
||||
db: Arc<Database>,
|
||||
listen_address: Multiaddr,
|
||||
|
||||
// Amounts agreed upon for swaps currently in the execution setup phase
|
||||
// Note: We can do one execution setup per peer at a given time.
|
||||
swap_amounts: HashMap<PeerId, SwapAmounts>,
|
||||
|
||||
recv_encrypted_signature: broadcast::Sender<EncryptedSignature>,
|
||||
send_transfer_proof: mpsc::Receiver<(PeerId, TransferProof)>,
|
||||
|
||||
// Only used to clone further handles
|
||||
handle: EventLoopHandle,
|
||||
// Only used to produce new handles
|
||||
send_transfer_proof_sender: mpsc::Sender<(PeerId, TransferProof)>,
|
||||
}
|
||||
|
||||
impl EventLoop {
|
||||
pub fn new(
|
||||
identity: libp2p::identity::Keypair,
|
||||
listen: Multiaddr,
|
||||
peer_id: PeerId,
|
||||
) -> Result<(Self, EventLoopHandle)> {
|
||||
listen_address: Multiaddr,
|
||||
seed: Seed,
|
||||
execution_params: ExecutionParams,
|
||||
bitcoin_wallet: Arc<bitcoin::Wallet>,
|
||||
monero_wallet: Arc<monero::Wallet>,
|
||||
db: Arc<Database>,
|
||||
) -> Result<Self> {
|
||||
let identity = network::Seed::new(seed).derive_libp2p_identity();
|
||||
let behaviour = Behaviour::default();
|
||||
let transport = transport::build(identity)?;
|
||||
let transport = transport::build(&identity)?;
|
||||
let peer_id = PeerId::from(identity.public());
|
||||
|
||||
let mut swarm = libp2p::swarm::SwarmBuilder::new(transport, behaviour, peer_id)
|
||||
.executor(Box::new(TokioExecutor {
|
||||
@ -92,39 +117,38 @@ impl EventLoop {
|
||||
}))
|
||||
.build();
|
||||
|
||||
Swarm::listen_on(&mut swarm, listen.clone())
|
||||
.with_context(|| format!("Address is not supported: {:#}", listen))?;
|
||||
Swarm::listen_on(&mut swarm, listen_address.clone())
|
||||
.with_context(|| format!("Address is not supported: {:#}", listen_address))?;
|
||||
|
||||
let recv_encrypted_signature = BroadcastChannels::default();
|
||||
let send_transfer_proof = MpscChannels::default();
|
||||
|
||||
let handle_clone = EventLoopHandle {
|
||||
recv_encrypted_signature: recv_encrypted_signature.sender.subscribe(),
|
||||
send_transfer_proof: send_transfer_proof.sender.clone(),
|
||||
};
|
||||
|
||||
let driver = EventLoop {
|
||||
Ok(EventLoop {
|
||||
swarm,
|
||||
peer_id,
|
||||
execution_params,
|
||||
bitcoin_wallet,
|
||||
monero_wallet,
|
||||
db,
|
||||
listen_address,
|
||||
swap_amounts: Default::default(),
|
||||
recv_encrypted_signature: recv_encrypted_signature.sender,
|
||||
send_transfer_proof: send_transfer_proof.receiver,
|
||||
handle: handle_clone,
|
||||
};
|
||||
|
||||
let handle = EventLoopHandle {
|
||||
recv_encrypted_signature: recv_encrypted_signature.receiver,
|
||||
send_transfer_proof: send_transfer_proof.sender,
|
||||
};
|
||||
|
||||
Ok((driver, handle))
|
||||
send_transfer_proof_sender: send_transfer_proof.sender,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn clone_handle(&self) -> EventLoopHandle {
|
||||
pub fn new_handle(&self) -> EventLoopHandle {
|
||||
EventLoopHandle {
|
||||
recv_encrypted_signature: self.recv_encrypted_signature.subscribe(),
|
||||
send_transfer_proof: self.handle.send_transfer_proof.clone(),
|
||||
send_transfer_proof: self.send_transfer_proof_sender.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn peer_id(&self) -> PeerId {
|
||||
self.peer_id
|
||||
}
|
||||
|
||||
pub async fn run(&mut self) {
|
||||
loop {
|
||||
tokio::select! {
|
||||
@ -133,11 +157,11 @@ impl EventLoop {
|
||||
OutEvent::ConnectionEstablished(alice) => {
|
||||
debug!("Connection Established with {}", alice);
|
||||
}
|
||||
OutEvent::SwapRequest { msg, channel } => {
|
||||
let _ = self.handle_swap_request(msg, channel).await;
|
||||
OutEvent::SwapRequest { msg, channel, bob_peer_id } => {
|
||||
let _ = self.handle_swap_request(msg, channel, bob_peer_id).await;
|
||||
}
|
||||
OutEvent::ExecutionSetupDone(state3) => {
|
||||
let _ = self.handle_execution_setup_done(*state3).await;
|
||||
OutEvent::ExecutionSetupDone{bob_peer_id, state3} => {
|
||||
let _ = self.handle_execution_setup_done(bob_peer_id, *state3).await;
|
||||
}
|
||||
OutEvent::TransferProofAcknowledged => {
|
||||
trace!("Bob acknowledged transfer proof");
|
||||
@ -165,11 +189,76 @@ impl EventLoop {
|
||||
}
|
||||
|
||||
async fn handle_swap_request(
|
||||
&self,
|
||||
_msg: SwapRequest,
|
||||
_channel: ResponseChannel<SwapResponse>,
|
||||
) {
|
||||
&mut self,
|
||||
swap_request: SwapRequest,
|
||||
channel: ResponseChannel<SwapResponse>,
|
||||
bob_peer_id: PeerId,
|
||||
) -> Result<()> {
|
||||
// 1. Check if acceptable request
|
||||
// 2. Send response
|
||||
|
||||
let btc_amount = swap_request.btc_amount;
|
||||
let xmr_amount = btc_amount.as_btc() * RATE as f64;
|
||||
let xmr_amount = monero::Amount::from_monero(xmr_amount)?;
|
||||
let swap_response = SwapResponse { xmr_amount };
|
||||
|
||||
self.swarm
|
||||
.send_swap_response(channel, swap_response)
|
||||
.context("Failed to send swap response")?;
|
||||
|
||||
// 3. Start setup execution
|
||||
|
||||
let state0 = State0::new(
|
||||
btc_amount,
|
||||
xmr_amount,
|
||||
self.execution_params,
|
||||
self.bitcoin_wallet.as_ref(),
|
||||
&mut OsRng,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// if a node restart during execution setup, the swap is aborted (safely).
|
||||
self.swap_amounts.insert(bob_peer_id, SwapAmounts {
|
||||
btc: btc_amount,
|
||||
xmr: xmr_amount,
|
||||
});
|
||||
|
||||
self.swarm.start_execution_setup(bob_peer_id, state0);
|
||||
// Continues once the execution setup protocol is done
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_execution_setup_done(&self, _state3: State3) {}
|
||||
async fn handle_execution_setup_done(
|
||||
&mut self,
|
||||
bob_peer_id: PeerId,
|
||||
state3: State3,
|
||||
) -> Result<()> {
|
||||
let swap_id = Uuid::new_v4();
|
||||
let handle = self.new_handle();
|
||||
|
||||
let swap_amounts = self.swap_amounts.remove(&bob_peer_id).ok_or_else(|| {
|
||||
anyhow!(
|
||||
"execution setup done for an unknown peer id: {}, node restarted in between?",
|
||||
bob_peer_id
|
||||
)
|
||||
})?;
|
||||
|
||||
let swap = Builder::new(
|
||||
self.peer_id,
|
||||
self.execution_params,
|
||||
swap_id,
|
||||
self.bitcoin_wallet.clone(),
|
||||
self.monero_wallet.clone(),
|
||||
self.db.clone(),
|
||||
self.listen_address.clone(),
|
||||
handle,
|
||||
)
|
||||
.with_init_params(swap_amounts, bob_peer_id, state3)
|
||||
.build()
|
||||
.await?;
|
||||
|
||||
tokio::spawn(async move { alice::run(swap).await });
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
@ -32,14 +32,17 @@ pub struct Message3 {
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum OutEvent {
|
||||
Done(State3),
|
||||
Done { bob_peer_id: PeerId, state3: State3 },
|
||||
Failure(Error),
|
||||
}
|
||||
|
||||
impl From<BehaviourOutEvent<State3, (), Error>> for OutEvent {
|
||||
fn from(event: BehaviourOutEvent<State3, (), Error>) -> Self {
|
||||
impl From<BehaviourOutEvent<(PeerId, State3), (), Error>> for OutEvent {
|
||||
fn from(event: BehaviourOutEvent<(PeerId, State3), (), Error>) -> Self {
|
||||
match event {
|
||||
BehaviourOutEvent::Inbound(_, Ok(State3)) => OutEvent::Done(State3),
|
||||
BehaviourOutEvent::Inbound(_, Ok((bob_peer_id, state3))) => OutEvent::Done {
|
||||
bob_peer_id,
|
||||
state3,
|
||||
},
|
||||
BehaviourOutEvent::Inbound(_, Err(e)) => OutEvent::Failure(e),
|
||||
BehaviourOutEvent::Outbound(..) => unreachable!("Alice only supports inbound"),
|
||||
}
|
||||
@ -49,7 +52,7 @@ impl From<BehaviourOutEvent<State3, (), Error>> for OutEvent {
|
||||
#[derive(libp2p::NetworkBehaviour)]
|
||||
#[behaviour(out_event = "OutEvent", event_process = false)]
|
||||
pub struct Behaviour {
|
||||
inner: libp2p_async_await::Behaviour<State3, (), anyhow::Error>,
|
||||
inner: libp2p_async_await::Behaviour<(PeerId, State3), (), anyhow::Error>,
|
||||
}
|
||||
|
||||
impl Default for Behaviour {
|
||||
@ -93,7 +96,7 @@ impl Behaviour {
|
||||
.context("failed to deserialize message4")?;
|
||||
let state3 = state2.receive(message4)?;
|
||||
|
||||
Ok(state3)
|
||||
Ok((bob, state3))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -25,10 +25,6 @@ use std::fmt;
|
||||
#[derive(Debug)]
|
||||
pub enum AliceState {
|
||||
Started {
|
||||
amounts: SwapAmounts,
|
||||
state0: State0,
|
||||
},
|
||||
Negotiated {
|
||||
bob_peer_id: PeerId,
|
||||
amounts: SwapAmounts,
|
||||
state3: Box<State3>,
|
||||
@ -70,7 +66,6 @@ impl fmt::Display for AliceState {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
AliceState::Started { .. } => write!(f, "started"),
|
||||
AliceState::Negotiated { .. } => write!(f, "negotiated"),
|
||||
AliceState::BtcLocked { .. } => write!(f, "btc is locked"),
|
||||
AliceState::XmrLocked { .. } => write!(f, "xmr is locked"),
|
||||
AliceState::EncSigLearned { .. } => write!(f, "encrypted signature is learned"),
|
||||
|
@ -84,36 +84,14 @@ async fn run_until_internal(
|
||||
monero_wallet: Arc<monero::Wallet>,
|
||||
execution_params: ExecutionParams,
|
||||
swap_id: Uuid,
|
||||
db: Database,
|
||||
db: Arc<Database>,
|
||||
) -> Result<AliceState> {
|
||||
info!("Current state:{}", state);
|
||||
if is_target_state(&state) {
|
||||
Ok(state)
|
||||
} else {
|
||||
match state {
|
||||
AliceState::Started { amounts, state0 } => {
|
||||
let state = AliceState::Negotiated {
|
||||
bob_peer_id: todo!(),
|
||||
amounts,
|
||||
state3: todo!(),
|
||||
};
|
||||
|
||||
let db_state = (&state).into();
|
||||
db.insert_latest_state(swap_id, database::Swap::Alice(db_state))
|
||||
.await?;
|
||||
run_until_internal(
|
||||
state,
|
||||
is_target_state,
|
||||
event_loop_handle,
|
||||
bitcoin_wallet,
|
||||
monero_wallet,
|
||||
execution_params,
|
||||
swap_id,
|
||||
db,
|
||||
)
|
||||
.await
|
||||
}
|
||||
AliceState::Negotiated {
|
||||
AliceState::Started {
|
||||
state3,
|
||||
bob_peer_id,
|
||||
amounts,
|
||||
|
@ -9,7 +9,7 @@ use libp2p::{
|
||||
ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent,
|
||||
RequestResponseMessage, ResponseChannel,
|
||||
},
|
||||
NetworkBehaviour,
|
||||
NetworkBehaviour, PeerId,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::time::Duration;
|
||||
@ -20,6 +20,7 @@ pub enum OutEvent {
|
||||
MsgReceived {
|
||||
msg: SwapRequest,
|
||||
channel: ResponseChannel<SwapResponse>,
|
||||
bob_peer_id: PeerId,
|
||||
},
|
||||
ResponseSent,
|
||||
Failure(Error),
|
||||
@ -45,6 +46,7 @@ impl From<RequestResponseEvent<SwapRequest, SwapResponse>> for OutEvent {
|
||||
OutEvent::MsgReceived {
|
||||
msg: request,
|
||||
channel,
|
||||
bob_peer_id: peer,
|
||||
}
|
||||
}
|
||||
RequestResponseEvent::Message {
|
||||
|
@ -168,7 +168,7 @@ impl Builder {
|
||||
&self,
|
||||
) -> Result<(bob::event_loop::EventLoop, bob::event_loop::EventLoopHandle)> {
|
||||
let bob_behaviour = bob::Behaviour::default();
|
||||
let bob_transport = build(self.identity.clone())?;
|
||||
let bob_transport = build(&self.identity)?;
|
||||
|
||||
bob::event_loop::EventLoop::new(
|
||||
bob_transport,
|
||||
|
@ -1,17 +1,13 @@
|
||||
pub mod testutils;
|
||||
|
||||
use swap::protocol::{alice, bob, bob::BobState};
|
||||
use swap::protocol::{bob, bob::BobState};
|
||||
use testutils::{bob_run_until::is_btc_locked, FastCancelConfig};
|
||||
|
||||
#[tokio::test]
|
||||
async fn given_bob_manually_refunds_after_btc_locked_bob_refunds() {
|
||||
testutils::setup_test(FastCancelConfig, |mut ctx| async move {
|
||||
let (alice_swap, _) = ctx.new_swap_as_alice().await;
|
||||
let (bob_swap, bob_join_handle) = ctx.new_swap_as_bob().await;
|
||||
|
||||
let alice_handle = alice::run(alice_swap);
|
||||
let alice_swap_handle = tokio::spawn(alice_handle);
|
||||
|
||||
let bob_state = bob::run_until(bob_swap, is_btc_locked).await.unwrap();
|
||||
|
||||
assert!(matches!(bob_state, BobState::BtcLocked { .. }));
|
||||
@ -29,6 +25,7 @@ async fn given_bob_manually_refunds_after_btc_locked_bob_refunds() {
|
||||
}
|
||||
|
||||
// Bob manually cancels
|
||||
bob_join_handle.abort();
|
||||
let (_, state) = bob::cancel(
|
||||
bob_swap.swap_id,
|
||||
bob_swap.state,
|
||||
@ -41,10 +38,11 @@ async fn given_bob_manually_refunds_after_btc_locked_bob_refunds() {
|
||||
.unwrap();
|
||||
assert!(matches!(state, BobState::BtcCancelled { .. }));
|
||||
|
||||
let (bob_swap, _) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await;
|
||||
let (bob_swap, bob_join_handle) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await;
|
||||
assert!(matches!(bob_swap.state, BobState::BtcCancelled { .. }));
|
||||
|
||||
// Bob manually refunds
|
||||
bob_join_handle.abort();
|
||||
let bob_state = bob::refund(
|
||||
bob_swap.swap_id,
|
||||
bob_swap.state,
|
||||
@ -58,9 +56,6 @@ async fn given_bob_manually_refunds_after_btc_locked_bob_refunds() {
|
||||
.unwrap();
|
||||
|
||||
ctx.assert_bob_refunded(bob_state).await;
|
||||
|
||||
let alice_state = alice_swap_handle.await.unwrap().unwrap();
|
||||
ctx.assert_alice_refunded(alice_state).await;
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
@ -1,18 +1,14 @@
|
||||
pub mod testutils;
|
||||
|
||||
use bob::cancel::CancelError;
|
||||
use swap::protocol::{alice, bob, bob::BobState};
|
||||
use swap::protocol::{bob, bob::BobState};
|
||||
use testutils::{bob_run_until::is_btc_locked, SlowCancelConfig};
|
||||
|
||||
#[tokio::test]
|
||||
async fn given_bob_manually_cancels_when_timelock_not_expired_errors() {
|
||||
testutils::setup_test(SlowCancelConfig, |mut ctx| async move {
|
||||
let (alice_swap, _) = ctx.new_swap_as_alice().await;
|
||||
let (bob_swap, bob_join_handle) = ctx.new_swap_as_bob().await;
|
||||
|
||||
let alice_handle = alice::run(alice_swap);
|
||||
tokio::spawn(alice_handle);
|
||||
|
||||
let bob_state = bob::run_until(bob_swap, is_btc_locked).await.unwrap();
|
||||
assert!(matches!(bob_state, BobState::BtcLocked { .. }));
|
||||
|
||||
|
@ -1,17 +1,13 @@
|
||||
pub mod testutils;
|
||||
|
||||
use swap::protocol::{alice, bob, bob::BobState};
|
||||
use swap::protocol::{bob, bob::BobState};
|
||||
use testutils::{bob_run_until::is_btc_locked, SlowCancelConfig};
|
||||
|
||||
#[tokio::test]
|
||||
async fn given_bob_manually_forces_cancel_when_timelock_not_expired_errors() {
|
||||
testutils::setup_test(SlowCancelConfig, |mut ctx| async move {
|
||||
let (alice_swap, _) = ctx.new_swap_as_alice().await;
|
||||
let (bob_swap, bob_join_handle) = ctx.new_swap_as_bob().await;
|
||||
|
||||
let alice_handle = alice::run(alice_swap);
|
||||
tokio::spawn(alice_handle);
|
||||
|
||||
let bob_state = bob::run_until(bob_swap, is_btc_locked).await.unwrap();
|
||||
assert!(matches!(bob_state, BobState::BtcLocked { .. }));
|
||||
|
||||
|
@ -1,23 +1,18 @@
|
||||
pub mod testutils;
|
||||
|
||||
use swap::protocol::{alice, bob};
|
||||
use swap::protocol::bob;
|
||||
use testutils::SlowCancelConfig;
|
||||
use tokio::join;
|
||||
|
||||
/// Run the following tests with RUST_MIN_STACK=10000000
|
||||
|
||||
#[tokio::test]
|
||||
async fn happy_path() {
|
||||
testutils::setup_test(SlowCancelConfig, |mut ctx| async move {
|
||||
let (alice_swap, _) = ctx.new_swap_as_alice().await;
|
||||
let (bob_swap, _) = ctx.new_swap_as_bob().await;
|
||||
|
||||
let alice = alice::run(alice_swap);
|
||||
let bob = bob::run(bob_swap);
|
||||
let bob_state = bob::run(bob_swap).await;
|
||||
|
||||
let (alice_state, bob_state) = join!(alice, bob);
|
||||
|
||||
ctx.assert_alice_redeemed(alice_state.unwrap()).await;
|
||||
ctx.assert_alice_redeemed().await;
|
||||
ctx.assert_bob_redeemed(bob_state.unwrap()).await;
|
||||
})
|
||||
.await;
|
||||
|
@ -1,17 +1,13 @@
|
||||
pub mod testutils;
|
||||
|
||||
use swap::protocol::{alice, bob, bob::BobState};
|
||||
use swap::protocol::{bob, bob::BobState};
|
||||
use testutils::{bob_run_until::is_xmr_locked, SlowCancelConfig};
|
||||
|
||||
#[tokio::test]
|
||||
async fn given_bob_restarts_after_xmr_is_locked_resume_swap() {
|
||||
testutils::setup_test(SlowCancelConfig, |mut ctx| async move {
|
||||
let (alice_swap, _) = ctx.new_swap_as_alice().await;
|
||||
let (bob_swap, bob_join_handle) = ctx.new_swap_as_bob().await;
|
||||
|
||||
let alice_handle = alice::run(alice_swap);
|
||||
let alice_swap_handle = tokio::spawn(alice_handle);
|
||||
|
||||
let bob_state = bob::run_until(bob_swap, is_xmr_locked).await.unwrap();
|
||||
|
||||
assert!(matches!(bob_state, BobState::XmrLocked { .. }));
|
||||
@ -23,8 +19,7 @@ async fn given_bob_restarts_after_xmr_is_locked_resume_swap() {
|
||||
|
||||
ctx.assert_bob_redeemed(bob_state).await;
|
||||
|
||||
let alice_state = alice_swap_handle.await.unwrap();
|
||||
ctx.assert_alice_redeemed(alice_state.unwrap()).await;
|
||||
ctx.assert_alice_redeemed().await;
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
@ -8,6 +8,7 @@ use std::{path::PathBuf, sync::Arc};
|
||||
use swap::{
|
||||
bitcoin,
|
||||
bitcoin::Timelock,
|
||||
database::{Database, Swap},
|
||||
execution_params,
|
||||
execution_params::{ExecutionParams, GetExecutionParams},
|
||||
monero,
|
||||
@ -30,31 +31,10 @@ pub struct StartingBalances {
|
||||
struct AliceParams {
|
||||
seed: Seed,
|
||||
execution_params: ExecutionParams,
|
||||
swap_id: Uuid,
|
||||
bitcoin_wallet: Arc<bitcoin::Wallet>,
|
||||
monero_wallet: Arc<monero::Wallet>,
|
||||
db_path: PathBuf,
|
||||
db: Arc<Database>,
|
||||
listen_address: Multiaddr,
|
||||
}
|
||||
|
||||
impl AliceParams {
|
||||
pub fn builder(&self) -> alice::Builder {
|
||||
alice::Builder::new(
|
||||
self.seed,
|
||||
self.execution_params,
|
||||
self.swap_id,
|
||||
self.bitcoin_wallet.clone(),
|
||||
self.monero_wallet.clone(),
|
||||
self.db_path.clone(),
|
||||
self.listen_address.clone(),
|
||||
)
|
||||
}
|
||||
|
||||
fn peer_id(&self) -> PeerId {
|
||||
self.builder().peer_id()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct BobParams {
|
||||
seed: Seed,
|
||||
@ -84,6 +64,12 @@ impl BobParams {
|
||||
|
||||
pub struct BobEventLoopJoinHandle(JoinHandle<()>);
|
||||
|
||||
impl BobEventLoopJoinHandle {
|
||||
pub fn abort(&self) {
|
||||
self.0.abort()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct AliceEventLoopJoinHandle(JoinHandle<()>);
|
||||
|
||||
pub struct TestContext {
|
||||
@ -101,20 +87,6 @@ pub struct TestContext {
|
||||
}
|
||||
|
||||
impl TestContext {
|
||||
pub async fn new_swap_as_alice(&mut self) -> (alice::Swap, AliceEventLoopJoinHandle) {
|
||||
let (swap, mut event_loop) = self
|
||||
.alice_params
|
||||
.builder()
|
||||
.with_init_params(self.swap_amounts)
|
||||
.build()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let join_handle = tokio::spawn(async move { event_loop.run().await });
|
||||
|
||||
(swap, AliceEventLoopJoinHandle(join_handle))
|
||||
}
|
||||
|
||||
pub async fn new_swap_as_bob(&mut self) -> (bob::Swap, BobEventLoopJoinHandle) {
|
||||
let (swap, event_loop) = self
|
||||
.bob_params
|
||||
@ -129,24 +101,11 @@ impl TestContext {
|
||||
(swap, BobEventLoopJoinHandle(join_handle))
|
||||
}
|
||||
|
||||
pub async fn stop_and_resume_alice_from_db(
|
||||
&mut self,
|
||||
join_handle: AliceEventLoopJoinHandle,
|
||||
) -> alice::Swap {
|
||||
join_handle.0.abort();
|
||||
|
||||
let (swap, mut event_loop) = self.alice_params.builder().build().await.unwrap();
|
||||
|
||||
tokio::spawn(async move { event_loop.run().await });
|
||||
|
||||
swap
|
||||
}
|
||||
|
||||
pub async fn stop_and_resume_bob_from_db(
|
||||
&mut self,
|
||||
join_handle: BobEventLoopJoinHandle,
|
||||
) -> (bob::Swap, BobEventLoopJoinHandle) {
|
||||
join_handle.0.abort();
|
||||
join_handle.abort();
|
||||
|
||||
let (swap, event_loop) = self.bob_params.builder().build().await.unwrap();
|
||||
|
||||
@ -155,7 +114,17 @@ impl TestContext {
|
||||
(swap, BobEventLoopJoinHandle(join_handle))
|
||||
}
|
||||
|
||||
pub async fn assert_alice_redeemed(&self, state: AliceState) {
|
||||
pub async fn assert_alice_redeemed(&self) {
|
||||
let mut states = self.alice_params.db.all().unwrap();
|
||||
|
||||
assert_eq!(states.len(), 1, "Expected only one swap in Alice's db");
|
||||
|
||||
let (_swap_id, state) = states.pop().unwrap();
|
||||
let state = match state {
|
||||
Swap::Alice(state) => state.into(),
|
||||
Swap::Bob(_) => panic!("Bob state in Alice db is unexpected"),
|
||||
};
|
||||
|
||||
assert!(matches!(state, AliceState::BtcRedeemed));
|
||||
|
||||
let btc_balance_after_swap = self.alice_bitcoin_wallet.as_ref().balance().await.unwrap();
|
||||
@ -174,8 +143,22 @@ impl TestContext {
|
||||
assert!(xmr_balance_after_swap <= self.alice_starting_balances.xmr - self.swap_amounts.xmr);
|
||||
}
|
||||
|
||||
pub async fn assert_alice_refunded(&self, state: AliceState) {
|
||||
assert!(matches!(state, AliceState::XmrRefunded));
|
||||
pub async fn assert_alice_refunded(&self) {
|
||||
let mut states = self.alice_params.db.all().unwrap();
|
||||
|
||||
assert_eq!(states.len(), 1, "Expected only one swap in Alice's db");
|
||||
|
||||
let (_swap_id, state) = states.pop().unwrap();
|
||||
let state = match state {
|
||||
Swap::Alice(state) => state.into(),
|
||||
Swap::Bob(_) => panic!("Bob state in Alice db is unexpected"),
|
||||
};
|
||||
|
||||
assert!(
|
||||
matches!(state, AliceState::XmrRefunded),
|
||||
"Alice state is not XmrRefunded: {}",
|
||||
state
|
||||
);
|
||||
|
||||
let btc_balance_after_swap = self.alice_bitcoin_wallet.as_ref().balance().await.unwrap();
|
||||
assert_eq!(btc_balance_after_swap, self.alice_starting_balances.btc);
|
||||
@ -342,13 +325,13 @@ where
|
||||
)
|
||||
.await;
|
||||
|
||||
let db_path = tempdir().unwrap();
|
||||
let alice_db = Arc::new(Database::open(db_path.path()).unwrap());
|
||||
|
||||
let alice_params = AliceParams {
|
||||
seed: Seed::random().unwrap(),
|
||||
execution_params,
|
||||
swap_id: Uuid::new_v4(),
|
||||
bitcoin_wallet: alice_bitcoin_wallet.clone(),
|
||||
monero_wallet: alice_monero_wallet.clone(),
|
||||
db_path: tempdir().unwrap().path().to_path_buf(),
|
||||
db: alice_db.clone(),
|
||||
listen_address,
|
||||
};
|
||||
|
||||
@ -365,6 +348,22 @@ where
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut alice_event_loop = alice::EventLoop::new(
|
||||
alice_params.listen_address.clone(),
|
||||
alice_params.seed,
|
||||
alice_params.execution_params,
|
||||
alice_bitcoin_wallet.clone(),
|
||||
alice_monero_wallet.clone(),
|
||||
alice_db,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let alice_peer_id = alice_event_loop.peer_id();
|
||||
|
||||
tokio::spawn(async move {
|
||||
alice_event_loop.run().await;
|
||||
});
|
||||
|
||||
let bob_params = BobParams {
|
||||
seed: Seed::random().unwrap(),
|
||||
db_path: tempdir().unwrap().path().to_path_buf(),
|
||||
@ -372,7 +371,7 @@ where
|
||||
bitcoin_wallet: bob_bitcoin_wallet.clone(),
|
||||
monero_wallet: bob_monero_wallet.clone(),
|
||||
alice_address: alice_params.listen_address.clone(),
|
||||
alice_peer_id: alice_params.peer_id(),
|
||||
alice_peer_id,
|
||||
execution_params,
|
||||
};
|
||||
|
||||
@ -388,7 +387,7 @@ where
|
||||
bob_monero_wallet,
|
||||
};
|
||||
|
||||
testfn(test).await
|
||||
testfn(test).await;
|
||||
}
|
||||
|
||||
async fn init_containers(cli: &Cli) -> (Monero, Containers<'_>) {
|
||||
|
Loading…
Reference in New Issue
Block a user