From 143d8bc07d973e570bb328c9bf75a1bc8d860e35 Mon Sep 17 00:00:00 2001 From: rishflab Date: Mon, 14 Dec 2020 18:25:46 +1100 Subject: [PATCH] Move message0 response handler out of NB --- swap/src/alice.rs | 51 +++++++++++------- swap/src/alice/event_loop.rs | 29 +++++++++-- swap/src/alice/message0.rs | 52 +++++++++---------- swap/src/alice/{execution.rs => negotiate.rs} | 9 +++- swap/src/alice/swap.rs | 52 ++----------------- swap/src/bin/swap.rs | 7 +-- swap/src/bob.rs | 2 +- swap/src/bob/{execution.rs => negotiate.rs} | 0 swap/src/bob/swap.rs | 24 ++------- swap/src/state.rs | 26 +--------- swap/tests/testutils/mod.rs | 7 +-- 11 files changed, 108 insertions(+), 151 deletions(-) rename swap/src/alice/{execution.rs => negotiate.rs} (97%) rename swap/src/bob/{execution.rs => negotiate.rs} (100%) diff --git a/swap/src/alice.rs b/swap/src/alice.rs index f78605db..1437e1fc 100644 --- a/swap/src/alice.rs +++ b/swap/src/alice.rs @@ -17,15 +17,15 @@ use libp2p::{ NetworkBehaviour, PeerId, }; use tracing::{debug, info}; -use xmr_btc::{alice::State0, bob}; +use xmr_btc::bob; mod amounts; pub mod event_loop; -mod execution; mod message0; mod message1; mod message2; mod message3; +mod negotiate; pub mod swap; pub type Swarm = libp2p::Swarm; @@ -60,7 +60,10 @@ pub enum OutEvent { // TODO (Franck): Change this to get both amounts so parties can verify the amounts are // expected early on. Request(amounts::OutEvent), // Not-uniform with Bob on purpose, ready for adding Xmr event. - Message0(bob::Message0), + Message0 { + msg: bob::Message0, + channel: ResponseChannel, + }, Message1 { msg: bob::Message1, channel: ResponseChannel, @@ -91,7 +94,7 @@ impl From for OutEvent { impl From for OutEvent { fn from(event: message0::OutEvent) -> Self { match event { - message0::OutEvent::Msg(msg) => OutEvent::Message0(msg), + message0::OutEvent::Msg { channel, msg } => OutEvent::Message0 { msg, channel }, } } } @@ -136,20 +139,6 @@ pub struct Behaviour { } impl Behaviour { - pub fn new(state: State0) -> Self { - let identity = Keypair::generate_ed25519(); - - Self { - pt: PeerTracker::default(), - amounts: Amounts::default(), - message0: Message0::new(state), - message1: Message1::default(), - message2: Message2::default(), - message3: Message3::default(), - identity, - } - } - pub fn identity(&self) -> Keypair { self.identity.clone() } @@ -165,6 +154,16 @@ impl Behaviour { info!("Sent amounts response"); } + /// Send Message0 to Bob in response to receiving his Message0. + pub fn send_message0( + &mut self, + channel: ResponseChannel, + msg: xmr_btc::alice::Message0, + ) { + self.message0.send(channel, msg); + debug!("Sent Message0"); + } + /// Send Message1 to Bob in response to receiving his Message1. pub fn send_message1( &mut self, @@ -185,3 +184,19 @@ impl Behaviour { debug!("Sent Message2"); } } + +impl Default for Behaviour { + fn default() -> Self { + let identity = Keypair::generate_ed25519(); + + Self { + pt: PeerTracker::default(), + amounts: Amounts::default(), + message0: Message0::default(), + message1: Message1::default(), + message2: Message2::default(), + message3: Message3::default(), + identity, + } + } +} diff --git a/swap/src/alice/event_loop.rs b/swap/src/alice/event_loop.rs index cd69d641..f52f4ec1 100644 --- a/swap/src/alice/event_loop.rs +++ b/swap/src/alice/event_loop.rs @@ -30,13 +30,14 @@ impl Default for Channels { } pub struct EventLoopHandle { - msg0: Receiver, + msg0: Receiver<(bob::Message0, ResponseChannel)>, msg1: Receiver<(bob::Message1, ResponseChannel)>, msg2: Receiver<(bob::Message2, ResponseChannel)>, msg3: Receiver, request: Receiver, conn_established: Receiver, send_amounts: Sender<(ResponseChannel, SwapAmounts)>, + send_msg0: Sender<(ResponseChannel, alice::Message0)>, send_msg1: Sender<(ResponseChannel, alice::Message1)>, send_msg2: Sender<(ResponseChannel, alice::Message2)>, } @@ -49,7 +50,7 @@ impl EventLoopHandle { .ok_or_else(|| anyhow!("Failed to receive connection established from Bob")) } - pub async fn recv_message0(&mut self) -> Result { + pub async fn recv_message0(&mut self) -> Result<(bob::Message0, ResponseChannel)> { self.msg0 .recv() .await @@ -93,6 +94,15 @@ impl EventLoopHandle { Ok(()) } + pub async fn send_message0( + &mut self, + channel: ResponseChannel, + msg: alice::Message0, + ) -> Result<()> { + let _ = self.send_msg0.send((channel, msg)).await?; + Ok(()) + } + pub async fn send_message1( &mut self, channel: ResponseChannel, @@ -114,13 +124,14 @@ impl EventLoopHandle { pub struct EventLoop { swarm: libp2p::Swarm, - msg0: Sender, + msg0: Sender<(bob::Message0, ResponseChannel)>, msg1: Sender<(bob::Message1, ResponseChannel)>, msg2: Sender<(bob::Message2, ResponseChannel)>, msg3: Sender, request: Sender, conn_established: Sender, send_amounts: Receiver<(ResponseChannel, SwapAmounts)>, + send_msg0: Receiver<(ResponseChannel, alice::Message0)>, send_msg1: Receiver<(ResponseChannel, alice::Message1)>, send_msg2: Receiver<(ResponseChannel, alice::Message2)>, } @@ -149,6 +160,7 @@ impl EventLoop { let request = Channels::new(); let conn_established = Channels::new(); let send_amounts = Channels::new(); + let send_msg0 = Channels::new(); let send_msg1 = Channels::new(); let send_msg2 = Channels::new(); @@ -161,6 +173,7 @@ impl EventLoop { request: request.sender, conn_established: conn_established.sender, send_amounts: send_amounts.receiver, + send_msg0: send_msg0.receiver, send_msg1: send_msg1.receiver, send_msg2: send_msg2.receiver, }; @@ -173,6 +186,7 @@ impl EventLoop { request: request.receiver, conn_established: conn_established.receiver, send_amounts: send_amounts.sender, + send_msg0: send_msg0.sender, send_msg1: send_msg1.sender, send_msg2: send_msg2.sender, }; @@ -188,8 +202,8 @@ impl EventLoop { OutEvent::ConnectionEstablished(alice) => { let _ = self.conn_established.send(alice).await; } - OutEvent::Message0(msg) => { - let _ = self.msg0.send(msg).await; + OutEvent::Message0 { msg, channel } => { + let _ = self.msg0.send((msg, channel)).await; } OutEvent::Message1 { msg, channel } => { let _ = self.msg1.send((msg, channel)).await; @@ -210,6 +224,11 @@ impl EventLoop { self.swarm.send_amounts(channel, amounts); } }, + msg0 = self.send_msg0.next().fuse() => { + if let Some((channel, msg)) = msg0 { + self.swarm.send_message0(channel, msg); + } + }, msg1 = self.send_msg1.next().fuse() => { if let Some((channel, msg)) = msg1 { self.swarm.send_message1(channel, msg); diff --git a/swap/src/alice/message0.rs b/swap/src/alice/message0.rs index 72ac0f30..2fb8753b 100644 --- a/swap/src/alice/message0.rs +++ b/swap/src/alice/message0.rs @@ -6,7 +6,6 @@ use libp2p::{ swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}, NetworkBehaviour, }; -use rand::rngs::OsRng; use std::{ collections::VecDeque, task::{Context, Poll}, @@ -15,11 +14,15 @@ use std::{ use tracing::{debug, error}; use crate::network::request_response::{AliceToBob, BobToAlice, Codec, Message0Protocol, TIMEOUT}; -use xmr_btc::{alice::State0, bob}; +use libp2p::request_response::ResponseChannel; +use xmr_btc::bob; #[derive(Debug)] pub enum OutEvent { - Msg(bob::Message0), + Msg { + msg: bob::Message0, + channel: ResponseChannel, + }, } /// A `NetworkBehaviour` that represents send/recv of message 0. @@ -30,12 +33,28 @@ pub struct Message0 { rr: RequestResponse>, #[behaviour(ignore)] events: VecDeque, - #[behaviour(ignore)] - state: State0, } impl Message0 { - pub fn new(state: State0) -> Self { + pub fn send(&mut self, channel: ResponseChannel, msg: xmr_btc::alice::Message0) { + let msg = AliceToBob::Message0(msg); + self.rr.send_response(channel, msg); + } + fn poll( + &mut self, + _: &mut Context<'_>, + _: &mut impl PollParameters, + ) -> Poll>, OutEvent>> { + if let Some(event) = self.events.pop_front() { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + } + + Poll::Pending + } +} + +impl Default for Message0 { + fn default() -> Self { let timeout = Duration::from_secs(TIMEOUT); let mut config = RequestResponseConfig::default(); config.set_request_timeout(timeout); @@ -47,21 +66,8 @@ impl Message0 { config, ), events: Default::default(), - state, } } - - fn poll( - &mut self, - _: &mut Context<'_>, - _: &mut impl PollParameters, - ) -> Poll>, OutEvent>> { - if let Some(event) = self.events.pop_front() { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); - } - - Poll::Pending - } } impl NetworkBehaviourEventProcess> for Message0 { @@ -76,13 +82,7 @@ impl NetworkBehaviourEventProcess> } => { if let BobToAlice::Message0(msg) = request { debug!("Received Message0"); - // TODO(Franck): Move this business logic out of the network behaviour. - let response = AliceToBob::Message0(self.state.next_message(&mut OsRng)); - - self.rr.send_response(channel, response); - debug!("Sent Message0"); - - self.events.push_back(OutEvent::Msg(msg)); + self.events.push_back(OutEvent::Msg { msg, channel }); } } RequestResponseEvent::Message { diff --git a/swap/src/alice/execution.rs b/swap/src/alice/negotiate.rs similarity index 97% rename from swap/src/alice/execution.rs rename to swap/src/alice/negotiate.rs index 286709f6..256159cc 100644 --- a/swap/src/alice/execution.rs +++ b/swap/src/alice/negotiate.rs @@ -10,6 +10,7 @@ use futures::{ }; use libp2p::request_response::ResponseChannel; +use rand::rngs::OsRng; use sha2::Sha256; use std::{sync::Arc, time::Duration}; use tokio::time::timeout; @@ -59,7 +60,13 @@ pub async fn negotiate( .send_amounts(event.channel, amounts) .await?; - let bob_message0 = timeout(config.bob_time_to_act, event_loop_handle.recv_message0()).await??; + let (bob_message0, channel) = + timeout(config.bob_time_to_act, event_loop_handle.recv_message0()).await??; + + let alice_message0 = state0.next_message(&mut OsRng); + event_loop_handle + .send_message0(channel, alice_message0) + .await?; let state1 = state0.receive(bob_message0)?; diff --git a/swap/src/alice/swap.rs b/swap/src/alice/swap.rs index e1bbe21a..a87a07ba 100644 --- a/swap/src/alice/swap.rs +++ b/swap/src/alice/swap.rs @@ -3,7 +3,7 @@ use crate::{ alice::{ event_loop::EventLoopHandle, - execution::{ + negotiate::{ build_bitcoin_punish_transaction, build_bitcoin_redeem_transaction, extract_monero_private_key, lock_xmr, negotiate, publish_bitcoin_punish_transaction, publish_bitcoin_redeem_transaction, publish_cancel_transaction, @@ -110,29 +110,6 @@ impl fmt::Display for AliceState { impl From<&AliceState> for state::Alice { fn from(alice_state: &AliceState) -> Self { match alice_state { - AliceState::Started { - amounts, - state0: - State0 { - a, - s_a, - v_a, - refund_timelock, - punish_timelock, - redeem_address, - punish_address, - .. - }, - } => Alice::Started { - amounts: *amounts, - a: a.clone(), - s_a: *s_a, - v_a: *v_a, - refund_timelock: *refund_timelock, - punish_timelock: *punish_timelock, - redeem_address: redeem_address.clone(), - punish_address: punish_address.clone(), - }, AliceState::Negotiated { state3, .. } => Alice::Negotiated(state3.clone()), AliceState::BtcLocked { state3, .. } => Alice::BtcLocked(state3.clone()), AliceState::XmrLocked { state3 } => Alice::XmrLocked(state3.clone()), @@ -153,6 +130,10 @@ impl From<&AliceState> for state::Alice { AliceState::Cancelling { state3 } => Alice::Cancelling(state3.clone()), AliceState::Punished => Alice::SwapComplete, AliceState::SafelyAborted => Alice::SwapComplete, + // todo: we may want to swap recovering from swaps that have not been negotiated + AliceState::Started { .. } => { + panic!("Alice attempted to save swap before being negotiated") + } } } } @@ -164,29 +145,6 @@ impl TryFrom for AliceState { use AliceState::*; if let Swap::Alice(state) = db_state { let alice_state = match state { - Alice::Started { - amounts, - a, - s_a, - v_a, - refund_timelock, - punish_timelock, - redeem_address, - punish_address, - } => Started { - amounts, - state0: State0 { - a, - s_a, - v_a, - btc: amounts.btc, - xmr: amounts.xmr, - refund_timelock, - punish_timelock, - redeem_address, - punish_address, - }, - }, Alice::Negotiated(state3) => Negotiated { channel: None, amounts: SwapAmounts { diff --git a/swap/src/bin/swap.rs b/swap/src/bin/swap.rs index 801f9099..9ea1984a 100644 --- a/swap/src/bin/swap.rs +++ b/swap/src/bin/swap.rs @@ -97,11 +97,8 @@ async fn main() -> Result<()> { ); ( - AliceState::Started { - amounts, - state0: state0.clone(), - }, - alice::Behaviour::new(state0), + AliceState::Started { amounts, state0 }, + alice::Behaviour::default(), ) }; diff --git a/swap/src/bob.rs b/swap/src/bob.rs index e3d2d023..3bbf81fe 100644 --- a/swap/src/bob.rs +++ b/swap/src/bob.rs @@ -20,11 +20,11 @@ use xmr_btc::{ mod amounts; pub mod event_loop; -mod execution; mod message0; mod message1; mod message2; mod message3; +mod negotiate; pub mod swap; pub type Swarm = libp2p::Swarm; diff --git a/swap/src/bob/execution.rs b/swap/src/bob/negotiate.rs similarity index 100% rename from swap/src/bob/execution.rs rename to swap/src/bob/negotiate.rs diff --git a/swap/src/bob/swap.rs b/swap/src/bob/swap.rs index 265ae1ed..a7a26021 100644 --- a/swap/src/bob/swap.rs +++ b/swap/src/bob/swap.rs @@ -1,5 +1,5 @@ use crate::{ - bob::{event_loop::EventLoopHandle, execution::negotiate}, + bob::{event_loop::EventLoopHandle, negotiate::negotiate}, state, state::Bob, storage::Database, @@ -59,15 +59,10 @@ impl fmt::Display for BobState { impl From for state::Bob { fn from(bob_state: BobState) -> Self { match bob_state { - BobState::Started { - state0, - amounts, - addr, - } => Bob::Started { - state0, - amounts, - addr, - }, + BobState::Started { .. } => { + // TODO: Do we want to resume just started swaps + unimplemented!("Cannot save a swap that has just started") + } BobState::Negotiated(state2, peer_id) => Bob::Negotiated { state2, peer_id }, BobState::BtcLocked(state3, peer_id) => Bob::BtcLocked { state3, peer_id }, BobState::XmrLocked(state4, peer_id) => Bob::XmrLocked { state4, peer_id }, @@ -85,15 +80,6 @@ impl From for state::Bob { impl From for BobState { fn from(bob: Bob) -> Self { match bob { - Bob::Started { - state0, - amounts, - addr, - } => BobState::Started { - state0, - amounts, - addr, - }, Bob::Negotiated { state2, peer_id } => BobState::Negotiated(state2, peer_id), Bob::BtcLocked { state3, peer_id } => BobState::BtcLocked(state3, peer_id), Bob::XmrLocked { state4, peer_id } => BobState::XmrLocked(state4, peer_id), diff --git a/swap/src/state.rs b/swap/src/state.rs index 601505f6..932939fa 100644 --- a/swap/src/state.rs +++ b/swap/src/state.rs @@ -1,10 +1,7 @@ -use crate::SwapAmounts; -use libp2p::{core::Multiaddr, PeerId}; +use libp2p::PeerId; use serde::{Deserialize, Serialize}; use std::fmt::Display; -use xmr_btc::{ - alice, bitcoin::EncryptedSignature, bob, cross_curve_dleq, monero, serde::monero_private_key, -}; +use xmr_btc::{alice, bitcoin::EncryptedSignature, bob, monero, serde::monero_private_key}; #[allow(clippy::large_enum_variant)] #[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] @@ -16,18 +13,6 @@ pub enum Swap { #[allow(clippy::large_enum_variant)] #[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] pub enum Alice { - Started { - amounts: SwapAmounts, - // TODO: This should not be saved, instead always derive it from a seed (and that seed file - // is the only thing that has to be kept secure) - a: crate::bitcoin::SecretKey, - s_a: cross_curve_dleq::Scalar, - v_a: monero::PrivateViewKey, - refund_timelock: u32, - punish_timelock: u32, - redeem_address: ::bitcoin::Address, - punish_address: ::bitcoin::Address, - }, Negotiated(alice::State3), BtcLocked(alice::State3), XmrLocked(alice::State3), @@ -54,11 +39,6 @@ pub enum Alice { #[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] pub enum Bob { - Started { - state0: bob::State0, - amounts: SwapAmounts, - addr: Multiaddr, - }, Negotiated { state2: bob::State2, #[serde(with = "crate::serde::peer_id")] @@ -108,7 +88,6 @@ impl Display for Swap { impl Display for Alice { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Alice::Started { .. } => f.write_str("Swap started"), Alice::Negotiated(_) => f.write_str("Handshake complete"), Alice::BtcLocked(_) => f.write_str("Bitcoin locked"), Alice::XmrLocked(_) => f.write_str("Monero locked"), @@ -132,7 +111,6 @@ impl Display for Bob { } Bob::BtcRedeemed(_) => f.write_str("Monero redeemable"), Bob::SwapComplete => f.write_str("Swap complete"), - Bob::Started { .. } => f.write_str("Swap started"), Bob::EncSigSent { .. } => f.write_str("Encrypted signature sent"), } } diff --git a/swap/tests/testutils/mod.rs b/swap/tests/testutils/mod.rs index 24850c99..90d92c0b 100644 --- a/swap/tests/testutils/mod.rs +++ b/swap/tests/testutils/mod.rs @@ -123,12 +123,9 @@ pub async fn init_alice_eventloop( redeem_address, punish_address, ); - let start_state = AliceState::Started { - amounts, - state0: state0.clone(), - }; + let start_state = AliceState::Started { amounts, state0 }; - let alice_behaviour = alice::Behaviour::new(state0); + let alice_behaviour = alice::Behaviour::default(); let alice_transport = build(alice_behaviour.identity()).unwrap(); let (swarm_driver, handle) =