diff --git a/Cargo.lock b/Cargo.lock index 3853cca9..ed5439a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1637,6 +1637,15 @@ dependencies = [ "wasm-timer", ] +[[package]] +name = "libp2p-async-await" +version = "0.1.0" +source = "git+https://github.com/comit-network/rust-libp2p-async-await?rev=1429cd780204624b4d244e7d8179fe6ff77988c3#1429cd780204624b4d244e7d8179fe6ff77988c3" +dependencies = [ + "libp2p", + "log", +] + [[package]] name = "libp2p-core" version = "0.27.0" @@ -3415,6 +3424,7 @@ dependencies = [ "get-port", "hyper", "libp2p", + "libp2p-async-await", "log", "miniscript", "monero", diff --git a/docs/sequence.puml b/docs/sequence.puml index 11638d4b..3a59aeb4 100644 --- a/docs/sequence.puml +++ b/docs/sequence.puml @@ -20,22 +20,22 @@ end group Execution Setup group Phase A [Messages can be exchanged in any order] - Bob -> Alice: bob::Message0 + Bob -> Alice: Message0 note left: Pubkeys\ndleq proof s_b\nxmr viewkey v_b\nbtc refund addr - Alice -> Bob: alice::Message0 + Alice -> Bob: Message1 note right: Pubkeys\ndleq proof s_a\nxmr view key v_a\nbtc redeem addr\nbtc punish addr end group Phase B [Messages must be exchanged in the given order] - Bob -> Alice: Message1 + Bob -> Alice: Message2 note left: unsigned btc lock tx - Alice -> Bob: Message2 + Alice -> Bob: Message3 note right: btc cancel tx sig\nbtc refund tx enc sig S_b - Bob -> Alice: Message3 + Bob -> Alice: Message4 note left: btc punish tx sig\nbtc cancel tx sig end diff --git a/monero-harness/rustfmt.toml b/monero-harness/rustfmt.toml index 100f5106..d973e35c 100644 --- a/monero-harness/rustfmt.toml +++ b/monero-harness/rustfmt.toml @@ -1,7 +1,7 @@ edition = "2018" condense_wildcard_suffixes = true format_macro_matchers = true -merge_imports = true +imports_granularity = "Crate" use_field_init_shorthand = true format_code_in_doc_comments = true normalize_comments = true diff --git a/monero-harness/src/image.rs b/monero-harness/src/image.rs index a491a345..81cc8723 100644 --- a/monero-harness/src/image.rs +++ b/monero-harness/src/image.rs @@ -303,14 +303,11 @@ impl IntoIterator for Args { type IntoIter = ::std::vec::IntoIter; fn into_iter(self) -> ::IntoIter { - let mut args = Vec::new(); - - args.push("/bin/bash".into()); - args.push("-c".into()); - - let cmd = format!("{} ", self.image_args.args()); - args.push(cmd); - - args.into_iter() + vec![ + "/bin/bash".to_string(), + "-c".to_string(), + format!("{} ", self.image_args.args()), + ] + .into_iter() } } diff --git a/rust-toolchain b/rust-toolchain index a66b2431..97e6da5f 100644 --- a/rust-toolchain +++ b/rust-toolchain @@ -1 +1 @@ -nightly-2020-08-13 +nightly-2021-01-31 diff --git a/rustfmt.toml b/rustfmt.toml index 100f5106..d973e35c 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -1,7 +1,7 @@ edition = "2018" condense_wildcard_suffixes = true format_macro_matchers = true -merge_imports = true +imports_granularity = "Crate" use_field_init_shorthand = true format_code_in_doc_comments = true normalize_comments = true diff --git a/swap/Cargo.toml b/swap/Cargo.toml index 94eda471..85609dcc 100644 --- a/swap/Cargo.toml +++ b/swap/Cargo.toml @@ -25,6 +25,7 @@ ecdsa_fun = { git = "https://github.com/LLFourn/secp256kfun", rev = "cdfbc766045 ed25519-dalek = { version = "1.0.0-pre.4", features = ["serde"] }# Cannot be 1 because they depend on curve25519-dalek version 3 futures = { version = "0.3", default-features = false } libp2p = { version = "0.34", default-features = false, features = ["tcp-tokio", "yamux", "mplex", "dns", "noise", "request-response"] } +libp2p-async-await = { git = "https://github.com/comit-network/rust-libp2p-async-await", rev = "1429cd780204624b4d244e7d8179fe6ff77988c3" } log = { version = "0.4", features = ["serde"] } miniscript = { version = "4", features = ["serde"] } monero = { version = "0.9", features = ["serde_support"] } diff --git a/swap/src/bitcoin/wallet.rs b/swap/src/bitcoin/wallet.rs index 793394ae..584fe5cc 100644 --- a/swap/src/bitcoin/wallet.rs +++ b/swap/src/bitcoin/wallet.rs @@ -157,8 +157,7 @@ impl TransactionBlockHeight for Wallet { .await .map_err(|_| backoff::Error::Transient(Error::Io))?; - let block_height = - block_height.ok_or_else(|| backoff::Error::Transient(Error::NotYetMined))?; + let block_height = block_height.ok_or(backoff::Error::Transient(Error::NotYetMined))?; Result::<_, backoff::Error>::Ok(block_height) }) diff --git a/swap/src/config/seed.rs b/swap/src/config/seed.rs index 371225e3..ef6382f7 100644 --- a/swap/src/config/seed.rs +++ b/swap/src/config/seed.rs @@ -17,7 +17,7 @@ impl Seed { Ok(Seed(seed::Seed::random()?)) } - pub fn from_file_or_generate(data_dir: &PathBuf) -> Result { + pub fn from_file_or_generate(data_dir: &Path) -> Result { let file_path_buf = data_dir.join("seed.pem"); let file_path = Path::new(&file_path_buf); diff --git a/swap/src/network/request_response.rs b/swap/src/network/request_response.rs index f0a01664..12cd3dc0 100644 --- a/swap/src/network/request_response.rs +++ b/swap/src/network/request_response.rs @@ -1,67 +1,21 @@ -use crate::protocol::{alice, alice::TransferProof, bob, bob::EncryptedSignature}; use async_trait::async_trait; use futures::prelude::*; use libp2p::{ core::{upgrade, upgrade::ReadOneError}, request_response::{ProtocolName, RequestResponseCodec}, }; -use serde::{Deserialize, Serialize}; +use serde::{de::DeserializeOwned, Serialize}; use std::{fmt::Debug, io, marker::PhantomData}; /// Time to wait for a response back once we send a request. pub const TIMEOUT: u64 = 3600; // One hour. /// Message receive buffer. -const BUF_SIZE: usize = 1024 * 1024; - -// TODO: Think about whether there is a better way to do this, e.g., separate -// Codec for each Message and a macro that implements them. - -/// Messages Bob sends to Alice. -#[derive(Clone, Debug, Serialize, Deserialize)] -pub enum BobToAlice { - SwapRequest(Box), - Message0(Box), - Message1(Box), - Message2(Box), -} - -/// Messages Alice sends to Bob. -#[derive(Clone, Debug, Serialize, Deserialize)] -pub enum AliceToBob { - SwapResponse(Box), - Message0(Box), - Message1(Box), - Message2, -} - -/// Messages sent from one party to the other. -/// All responses are empty -#[derive(Clone, Debug, Serialize, Deserialize)] -pub enum Request { - TransferProof(Box), - EncryptedSignature(Box), -} - -#[derive(Clone, Copy, Debug, Serialize, Deserialize)] -/// Response are only used for acknowledgement purposes. -pub enum Response { - TransferProof, - EncryptedSignature, -} +pub const BUF_SIZE: usize = 1024 * 1024; #[derive(Debug, Clone, Copy, Default)] pub struct Swap; -#[derive(Debug, Clone, Copy, Default)] -pub struct Message0Protocol; - -#[derive(Debug, Clone, Copy, Default)] -pub struct Message1Protocol; - -#[derive(Debug, Clone, Copy, Default)] -pub struct Message2Protocol; - #[derive(Debug, Clone, Copy, Default)] pub struct TransferProofProtocol; @@ -70,139 +24,45 @@ pub struct EncryptedSignatureProtocol; impl ProtocolName for Swap { fn protocol_name(&self) -> &[u8] { - b"/xmr/btc/swap/1.0.0" - } -} - -impl ProtocolName for Message0Protocol { - fn protocol_name(&self) -> &[u8] { - b"/xmr/btc/message0/1.0.0" - } -} - -impl ProtocolName for Message1Protocol { - fn protocol_name(&self) -> &[u8] { - b"/xmr/btc/message1/1.0.0" - } -} - -impl ProtocolName for Message2Protocol { - fn protocol_name(&self) -> &[u8] { - b"/xmr/btc/message2/1.0.0" + b"/comit/xmr/btc/swap/1.0.0" } } impl ProtocolName for TransferProofProtocol { fn protocol_name(&self) -> &[u8] { - b"/xmr/btc/transfer_proof/1.0.0" + b"/comit/xmr/btc/transfer_proof/1.0.0" } } impl ProtocolName for EncryptedSignatureProtocol { fn protocol_name(&self) -> &[u8] { - b"/xmr/btc/encrypted_signature/1.0.0" + b"/comit/xmr/btc/encrypted_signature/1.0.0" } } -#[derive(Clone, Copy, Debug, Default)] -pub struct Codec

{ - phantom: PhantomData

, +#[derive(Clone, Copy, Debug)] +pub struct CborCodec { + phantom: PhantomData<(P, Req, Res)>, +} + +impl Default for CborCodec { + fn default() -> Self { + Self { + phantom: PhantomData::default(), + } + } } #[async_trait] -impl

RequestResponseCodec for Codec

+impl RequestResponseCodec for CborCodec where - P: Send + Sync + Clone + ProtocolName, + P: ProtocolName + Send + Sync + Clone, + Req: DeserializeOwned + Serialize + Send, + Res: DeserializeOwned + Serialize + Send, { type Protocol = P; - type Request = BobToAlice; - type Response = AliceToBob; - - async fn read_request(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result - where - T: AsyncRead + Unpin + Send, - { - let message = upgrade::read_one(io, BUF_SIZE) - .await - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - let mut de = serde_cbor::Deserializer::from_slice(&message); - let msg = BobToAlice::deserialize(&mut de).map_err(|e| { - tracing::debug!("serde read_request error: {:?}", e); - io::Error::new(io::ErrorKind::Other, e) - })?; - - Ok(msg) - } - - async fn read_response( - &mut self, - _: &Self::Protocol, - io: &mut T, - ) -> io::Result - where - T: AsyncRead + Unpin + Send, - { - let message = upgrade::read_one(io, BUF_SIZE) - .await - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - let mut de = serde_cbor::Deserializer::from_slice(&message); - let msg = AliceToBob::deserialize(&mut de).map_err(|e| { - tracing::debug!("serde read_response error: {:?}", e); - io::Error::new(io::ErrorKind::InvalidData, e) - })?; - - Ok(msg) - } - - async fn write_request( - &mut self, - _: &Self::Protocol, - io: &mut T, - req: Self::Request, - ) -> io::Result<()> - where - T: AsyncWrite + Unpin + Send, - { - let bytes = - serde_cbor::to_vec(&req).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - - upgrade::write_one(io, &bytes).await?; - - Ok(()) - } - - async fn write_response( - &mut self, - _: &Self::Protocol, - io: &mut T, - res: Self::Response, - ) -> io::Result<()> - where - T: AsyncWrite + Unpin + Send, - { - let bytes = serde_cbor::to_vec(&res).map_err(|e| { - tracing::debug!("serde write_reponse error: {:?}", e); - io::Error::new(io::ErrorKind::InvalidData, e) - })?; - upgrade::write_one(io, &bytes).await?; - - Ok(()) - } -} - -#[derive(Clone, Copy, Debug, Default)] -pub struct OneShotCodec

{ - phantom: PhantomData

, -} - -#[async_trait] -impl

RequestResponseCodec for OneShotCodec

-where - P: Send + Sync + Clone + ProtocolName, -{ - type Protocol = P; - type Request = Request; - type Response = Response; + type Request = Req; + type Response = Res; async fn read_request(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result where @@ -213,7 +73,7 @@ where e => io::Error::new(io::ErrorKind::Other, e), })?; let mut de = serde_cbor::Deserializer::from_slice(&message); - let msg = Request::deserialize(&mut de).map_err(|e| { + let msg = Req::deserialize(&mut de).map_err(|e| { tracing::debug!("serde read_request error: {:?}", e); io::Error::new(io::ErrorKind::Other, e) })?; @@ -233,7 +93,7 @@ where .await .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; let mut de = serde_cbor::Deserializer::from_slice(&message); - let msg = Response::deserialize(&mut de).map_err(|e| { + let msg = Res::deserialize(&mut de).map_err(|e| { tracing::debug!("serde read_response error: {:?}", e); io::Error::new(io::ErrorKind::InvalidData, e) })?; @@ -268,7 +128,7 @@ where T: AsyncWrite + Unpin + Send, { let bytes = serde_cbor::to_vec(&res).map_err(|e| { - tracing::debug!("serde write_reponse error: {:?}", e); + tracing::debug!("serde write_response error: {:?}", e); io::Error::new(io::ErrorKind::InvalidData, e) })?; upgrade::write_one(io, &bytes).await?; diff --git a/swap/src/protocol/alice.rs b/swap/src/protocol/alice.rs index 20bf002f..9f741412 100644 --- a/swap/src/protocol/alice.rs +++ b/swap/src/protocol/alice.rs @@ -1,14 +1,5 @@ //! Run an XMR/BTC swap in the role of Alice. //! Alice holds XMR and wishes receive BTC. -pub use self::{ - event_loop::{EventLoop, EventLoopHandle}, - message0::Message0, - message1::Message1, - state::*, - swap::{run, run_until}, - swap_response::*, - transfer_proof::TransferProof, -}; use crate::{ bitcoin, database, database::Database, @@ -16,14 +7,13 @@ use crate::{ monero, network::{ peer_tracker::{self, PeerTracker}, - request_response::AliceToBob, transport::build, Seed as NetworkSeed, }, - protocol::{bob, bob::EncryptedSignature, SwapAmounts}, + protocol::{bob::EncryptedSignature, SwapAmounts}, seed::Seed, }; -use anyhow::{bail, Result}; +use anyhow::{bail, Error, Result}; use libp2p::{ core::Multiaddr, identity::Keypair, request_response::ResponseChannel, NetworkBehaviour, PeerId, }; @@ -32,11 +22,20 @@ use std::{path::PathBuf, sync::Arc}; use tracing::{debug, info}; use uuid::Uuid; +pub use self::{ + event_loop::{EventLoop, EventLoopHandle}, + execution_setup::Message1, + state::*, + swap::{run, run_until}, + swap_response::*, + transfer_proof::TransferProof, +}; +use crate::protocol::bob::SwapRequest; +pub use execution_setup::Message3; + mod encrypted_signature; pub mod event_loop; -mod message0; -mod message1; -mod message2; +mod execution_setup; pub mod state; mod steps; pub mod swap; @@ -198,6 +197,7 @@ impl Builder { self.execution_params.bitcoin_punish_timelock, redeem_address, punish_address, + rng, ); Ok(AliceState::Started { amounts, state0 }) @@ -218,25 +218,18 @@ impl Builder { #[derive(Debug)] pub enum OutEvent { ConnectionEstablished(PeerId), - // TODO (Franck): Change this to get both amounts so parties can verify the amounts are - // expected early on. - Request(Box), /* Not-uniform with Bob on purpose, ready for adding - * Xmr - * event. */ - Message0 { - msg: Box, - channel: ResponseChannel, - }, - Message1 { - msg: bob::Message1, - channel: ResponseChannel, - }, - Message2 { - msg: Box, - bob_peer_id: PeerId, + SwapRequest { + msg: SwapRequest, + channel: ResponseChannel, }, + ExecutionSetupDone(Result>), TransferProofAcknowledged, - EncryptedSignature(EncryptedSignature), + EncryptedSignature { + msg: Box, + channel: ResponseChannel<()>, + }, + ResponseSent, // Same variant is used for all messages as no processing is done + Failure(Error), } impl From for OutEvent { @@ -251,52 +244,43 @@ impl From for OutEvent { impl From for OutEvent { fn from(event: swap_response::OutEvent) -> Self { - OutEvent::Request(Box::new(event)) - } -} - -impl From for OutEvent { - fn from(event: message0::OutEvent) -> Self { + use swap_response::OutEvent::*; match event { - message0::OutEvent::Msg { channel, msg } => OutEvent::Message0 { - msg: Box::new(msg), - channel, - }, + MsgReceived { msg, channel } => OutEvent::SwapRequest { msg, channel }, + ResponseSent => OutEvent::ResponseSent, + Failure(err) => OutEvent::Failure(err.context("Swap Request/Response failure")), } } } -impl From for OutEvent { - fn from(event: message1::OutEvent) -> Self { +impl From for OutEvent { + fn from(event: execution_setup::OutEvent) -> Self { match event { - message1::OutEvent::Msg { msg, channel } => OutEvent::Message1 { msg, channel }, - } - } -} - -impl From for OutEvent { - fn from(event: message2::OutEvent) -> Self { - match event { - message2::OutEvent::Msg { msg, bob_peer_id } => OutEvent::Message2 { - msg: Box::new(msg), - bob_peer_id, - }, + execution_setup::OutEvent::Done(res) => OutEvent::ExecutionSetupDone(res.map(Box::new)), } } } impl From for OutEvent { fn from(event: transfer_proof::OutEvent) -> Self { + use transfer_proof::OutEvent::*; match event { - transfer_proof::OutEvent::Acknowledged => OutEvent::TransferProofAcknowledged, + Acknowledged => OutEvent::TransferProofAcknowledged, + Failure(err) => OutEvent::Failure(err.context("Failure with Transfer Proof")), } } } impl From for OutEvent { fn from(event: encrypted_signature::OutEvent) -> Self { + use encrypted_signature::OutEvent::*; match event { - encrypted_signature::OutEvent::Msg(msg) => OutEvent::EncryptedSignature(msg), + MsgReceived { msg, channel } => OutEvent::EncryptedSignature { + msg: Box::new(msg), + channel, + }, + AckSent => OutEvent::ResponseSent, + Failure(err) => OutEvent::Failure(err.context("Failure with Encrypted Signature")), } } } @@ -308,9 +292,7 @@ impl From for OutEvent { pub struct Behaviour { pt: PeerTracker, amounts: swap_response::Behaviour, - message0: message0::Behaviour, - message1: message1::Behaviour, - message2: message2::Behaviour, + execution_setup: execution_setup::Behaviour, transfer_proof: transfer_proof::Behaviour, encrypted_signature: encrypted_signature::Behaviour, } @@ -319,7 +301,7 @@ impl Behaviour { /// Alice always sends her messages as a response to a request from Bob. pub fn send_swap_response( &mut self, - channel: ResponseChannel, + channel: ResponseChannel, swap_response: SwapResponse, ) -> Result<()> { self.amounts.send(channel, swap_response)?; @@ -327,26 +309,9 @@ impl Behaviour { Ok(()) } - /// Send Message0 to Bob in response to receiving his Message0. - pub fn send_message0( - &mut self, - channel: ResponseChannel, - msg: Message0, - ) -> Result<()> { - self.message0.send(channel, msg)?; - debug!("Sent Message0"); - Ok(()) - } - - /// Send Message1 to Bob in response to receiving his Message1. - pub fn send_message1( - &mut self, - channel: ResponseChannel, - msg: Message1, - ) -> Result<()> { - self.message1.send(channel, msg)?; - debug!("Sent Message1"); - Ok(()) + pub fn start_execution_setup(&mut self, bob_peer_id: PeerId, state0: State0) { + self.execution_setup.run(bob_peer_id, state0); + info!("Start execution setup with {}", bob_peer_id); } /// Send Transfer Proof to Bob. diff --git a/swap/src/protocol/alice/encrypted_signature.rs b/swap/src/protocol/alice/encrypted_signature.rs index 2b66dcd5..06fc1bb7 100644 --- a/swap/src/protocol/alice/encrypted_signature.rs +++ b/swap/src/protocol/alice/encrypted_signature.rs @@ -1,53 +1,42 @@ use crate::{ - network::request_response::{ - EncryptedSignatureProtocol, OneShotCodec, Request, Response, TIMEOUT, - }, + network::request_response::{CborCodec, EncryptedSignatureProtocol, TIMEOUT}, protocol::bob::EncryptedSignature, }; +use anyhow::{anyhow, Error, Result}; use libp2p::{ request_response::{ - handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig, - RequestResponseEvent, RequestResponseMessage, + ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent, + RequestResponseMessage, ResponseChannel, }, - swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}, NetworkBehaviour, }; -use std::{ - collections::VecDeque, - task::{Context, Poll}, - time::Duration, -}; -use tracing::{debug, error}; +use std::time::Duration; +use tracing::debug; #[derive(Debug)] pub enum OutEvent { - Msg(EncryptedSignature), + MsgReceived { + msg: EncryptedSignature, + channel: ResponseChannel<()>, + }, + AckSent, + Failure(Error), } /// A `NetworkBehaviour` that represents receiving the Bitcoin encrypted /// signature from Bob. #[derive(NetworkBehaviour)] -#[behaviour(out_event = "OutEvent", poll_method = "poll")] +#[behaviour(out_event = "OutEvent", event_process = false)] #[allow(missing_debug_implementations)] pub struct Behaviour { - rr: RequestResponse>, - #[behaviour(ignore)] - events: VecDeque, + rr: RequestResponse>, } impl Behaviour { - fn poll( - &mut self, - _: &mut Context<'_>, - _: &mut impl PollParameters, - ) -> Poll< - NetworkBehaviourAction>, OutEvent>, - > { - if let Some(event) = self.events.pop_front() { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); - } - - Poll::Pending + pub fn send_ack(&mut self, channel: ResponseChannel<()>) -> Result<()> { + self.rr + .send_response(channel, ()) + .map_err(|err| anyhow!("Failed to ack encrypted signature: {:?}", err)) } } @@ -59,48 +48,42 @@ impl Default for Behaviour { Self { rr: RequestResponse::new( - OneShotCodec::default(), + CborCodec::default(), vec![(EncryptedSignatureProtocol, ProtocolSupport::Inbound)], config, ), - events: Default::default(), } } } -impl NetworkBehaviourEventProcess> for Behaviour { - fn inject_event(&mut self, event: RequestResponseEvent) { +impl From> for OutEvent { + fn from(event: RequestResponseEvent) -> Self { match event { RequestResponseEvent::Message { + peer, message: RequestResponseMessage::Request { request, channel, .. }, .. } => { - if let Request::EncryptedSignature(msg) = request { - debug!("Received encrypted signature"); - self.events.push_back(OutEvent::Msg(*msg)); - // Send back empty response so that the request/response protocol completes. - if let Err(error) = self.rr.send_response(channel, Response::EncryptedSignature) - { - error!("Failed to send Encrypted Signature ack: {:?}", error); - } + debug!("Received encrypted signature from {}", peer); + OutEvent::MsgReceived { + msg: request, + channel, } } RequestResponseEvent::Message { message: RequestResponseMessage::Response { .. }, .. - } => panic!("Alice should not get a Response"), + } => OutEvent::Failure(anyhow!("Alice should not get a Response")), RequestResponseEvent::InboundFailure { error, .. } => { - error!("Inbound failure: {:?}", error); + OutEvent::Failure(anyhow!("Inbound failure: {:?}", error)) } RequestResponseEvent::OutboundFailure { error, .. } => { - error!("Outbound failure: {:?}", error); - } - RequestResponseEvent::ResponseSent { .. } => { - debug!("Alice has sent an Message3 response to Bob"); + OutEvent::Failure(anyhow!("Outbound failure: {:?}", error)) } + RequestResponseEvent::ResponseSent { .. } => OutEvent::AckSent, } } } diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index 9e28934a..26efd22b 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -1,10 +1,8 @@ use crate::{ - network::{request_response::AliceToBob, transport::SwapTransport, TokioExecutor}, + network::{transport::SwapTransport, TokioExecutor}, protocol::{ - alice, - alice::{Behaviour, OutEvent, SwapResponse, TransferProof}, - bob, - bob::EncryptedSignature, + alice::{Behaviour, OutEvent, State0, State3, SwapResponse, TransferProof}, + bob::{EncryptedSignature, SwapRequest}, }, }; use anyhow::{anyhow, Context, Result}; @@ -35,15 +33,12 @@ impl Default for Channels { #[derive(Debug)] pub struct EventLoopHandle { - recv_message0: Receiver<(bob::Message0, ResponseChannel)>, - recv_message1: Receiver<(bob::Message1, ResponseChannel)>, - recv_message2: Receiver, + done_execution_setup: Receiver>, recv_encrypted_signature: Receiver, - request: Receiver, + recv_swap_request: Receiver<(SwapRequest, ResponseChannel)>, conn_established: Receiver, - send_swap_response: Sender<(ResponseChannel, SwapResponse)>, - send_message0: Sender<(ResponseChannel, alice::Message0)>, - send_message1: Sender<(ResponseChannel, alice::Message1)>, + send_swap_response: Sender<(ResponseChannel, SwapResponse)>, + start_execution_setup: Sender<(PeerId, State0)>, send_transfer_proof: Sender<(PeerId, TransferProof)>, recv_transfer_proof_ack: Receiver<()>, } @@ -56,25 +51,16 @@ impl EventLoopHandle { .ok_or_else(|| anyhow!("Failed to receive connection established from Bob")) } - pub async fn recv_message0(&mut self) -> Result<(bob::Message0, ResponseChannel)> { - self.recv_message0 - .recv() - .await - .ok_or_else(|| anyhow!("Failed to receive message 0 from Bob")) - } + pub async fn execution_setup(&mut self, bob_peer_id: PeerId, state0: State0) -> Result { + let _ = self + .start_execution_setup + .send((bob_peer_id, state0)) + .await?; - pub async fn recv_message1(&mut self) -> Result<(bob::Message1, ResponseChannel)> { - self.recv_message1 + self.done_execution_setup .recv() .await - .ok_or_else(|| anyhow!("Failed to receive message 1 from Bob")) - } - - pub async fn recv_message2(&mut self) -> Result { - self.recv_message2 - .recv() - .await - .ok_or_else(|| anyhow!("Failed to receive message 2 from Bob")) + .ok_or_else(|| anyhow!("Failed to setup execution with Bob"))? } pub async fn recv_encrypted_signature(&mut self) -> Result { @@ -84,10 +70,10 @@ impl EventLoopHandle { .ok_or_else(|| anyhow!("Failed to receive Bitcoin encrypted signature from Bob")) } - pub async fn recv_request( + pub async fn recv_swap_request( &mut self, - ) -> Result { - self.request + ) -> Result<(SwapRequest, ResponseChannel)> { + self.recv_swap_request .recv() .await .ok_or_else(|| anyhow!("Failed to receive amounts request from Bob")) @@ -95,7 +81,7 @@ impl EventLoopHandle { pub async fn send_swap_response( &mut self, - channel: ResponseChannel, + channel: ResponseChannel, swap_response: SwapResponse, ) -> Result<()> { let _ = self @@ -105,24 +91,6 @@ impl EventLoopHandle { Ok(()) } - pub async fn send_message0( - &mut self, - channel: ResponseChannel, - msg: alice::Message0, - ) -> Result<()> { - let _ = self.send_message0.send((channel, msg)).await?; - Ok(()) - } - - pub async fn send_message1( - &mut self, - channel: ResponseChannel, - msg: alice::Message1, - ) -> Result<()> { - let _ = self.send_message1.send((channel, msg)).await?; - Ok(()) - } - pub async fn send_transfer_proof(&mut self, bob: PeerId, msg: TransferProof) -> Result<()> { let _ = self.send_transfer_proof.send((bob, msg)).await?; @@ -137,15 +105,12 @@ impl EventLoopHandle { #[allow(missing_debug_implementations)] pub struct EventLoop { swarm: libp2p::Swarm, - recv_message0: Sender<(bob::Message0, ResponseChannel)>, - recv_message1: Sender<(bob::Message1, ResponseChannel)>, - recv_message2: Sender, + start_execution_setup: Receiver<(PeerId, State0)>, + done_execution_setup: Sender>, recv_encrypted_signature: Sender, - request: Sender, + recv_swap_request: Sender<(SwapRequest, ResponseChannel)>, conn_established: Sender, - send_swap_response: Receiver<(ResponseChannel, SwapResponse)>, - send_message0: Receiver<(ResponseChannel, alice::Message0)>, - send_message1: Receiver<(ResponseChannel, alice::Message1)>, + send_swap_response: Receiver<(ResponseChannel, SwapResponse)>, send_transfer_proof: Receiver<(PeerId, TransferProof)>, recv_transfer_proof_ack: Sender<()>, } @@ -166,43 +131,34 @@ impl EventLoop { Swarm::listen_on(&mut swarm, listen.clone()) .with_context(|| format!("Address is not supported: {:#}", listen))?; - let recv_message0 = Channels::new(); - let recv_message1 = Channels::new(); - let recv_message2 = Channels::new(); + let start_execution_setup = Channels::new(); + let done_execution_setup = Channels::new(); let recv_encrypted_signature = Channels::new(); let request = Channels::new(); let conn_established = Channels::new(); let send_swap_response = Channels::new(); - let send_message0 = Channels::new(); - let send_message1 = Channels::new(); let send_transfer_proof = Channels::new(); let recv_transfer_proof_ack = Channels::new(); let driver = EventLoop { swarm, - recv_message0: recv_message0.sender, - recv_message1: recv_message1.sender, - recv_message2: recv_message2.sender, + start_execution_setup: start_execution_setup.receiver, + done_execution_setup: done_execution_setup.sender, recv_encrypted_signature: recv_encrypted_signature.sender, - request: request.sender, + recv_swap_request: request.sender, conn_established: conn_established.sender, send_swap_response: send_swap_response.receiver, - send_message0: send_message0.receiver, - send_message1: send_message1.receiver, send_transfer_proof: send_transfer_proof.receiver, recv_transfer_proof_ack: recv_transfer_proof_ack.sender, }; let handle = EventLoopHandle { - recv_message0: recv_message0.receiver, - recv_message1: recv_message1.receiver, - recv_message2: recv_message2.receiver, + start_execution_setup: start_execution_setup.sender, + done_execution_setup: done_execution_setup.receiver, recv_encrypted_signature: recv_encrypted_signature.receiver, - request: request.receiver, + recv_swap_request: request.receiver, conn_established: conn_established.receiver, send_swap_response: send_swap_response.sender, - send_message0: send_message0.sender, - send_message1: send_message1.sender, send_transfer_proof: send_transfer_proof.sender, recv_transfer_proof_ack: recv_transfer_proof_ack.receiver, }; @@ -218,24 +174,26 @@ impl EventLoop { OutEvent::ConnectionEstablished(alice) => { let _ = self.conn_established.send(alice).await; } - OutEvent::Message0 { msg, channel } => { - let _ = self.recv_message0.send((*msg, channel)).await; + OutEvent::SwapRequest { msg, channel } => { + let _ = self.recv_swap_request.send((msg, channel)).await; } - OutEvent::Message1 { msg, channel } => { - let _ = self.recv_message1.send((msg, channel)).await; - } - OutEvent::Message2 { msg, bob_peer_id : _} => { - let _ = self.recv_message2.send(*msg).await; + OutEvent::ExecutionSetupDone(res) => { + let _ = self.done_execution_setup.send(res.map(|state|*state)).await; } OutEvent::TransferProofAcknowledged => { trace!("Bob acknowledged transfer proof"); let _ = self.recv_transfer_proof_ack.send(()).await; } - OutEvent::EncryptedSignature(msg) => { - let _ = self.recv_encrypted_signature.send(msg).await; + OutEvent::EncryptedSignature{ msg, channel } => { + let _ = self.recv_encrypted_signature.send(*msg).await; + // Send back empty response so that the request/response protocol completes. + if let Err(error) = self.swarm.encrypted_signature.send_ack(channel) { + error!("Failed to send Encrypted Signature ack: {:?}", error); + } } - OutEvent::Request(event) => { - let _ = self.request.send(*event).await; + OutEvent::ResponseSent => {} + OutEvent::Failure(err) => { + error!("Communication error: {:#}", err); } } }, @@ -247,20 +205,11 @@ impl EventLoop { .map_err(|err|error!("Failed to send swap response: {:#}", err)); } }, - msg0 = self.send_message0.recv().fuse() => { - if let Some((channel, msg)) = msg0 { + option = self.start_execution_setup.recv().fuse() => { + if let Some((bob_peer_id, state0)) = option { let _ = self .swarm - .send_message0(channel, msg) - .map_err(|err|error!("Failed to send message0: {:#}", err)); - } - }, - msg1 = self.send_message1.recv().fuse() => { - if let Some((channel, msg)) = msg1 { - let _ = self - .swarm - .send_message1(channel, msg) - .map_err(|err|error!("Failed to send message1: {:#}", err)); + .start_execution_setup(bob_peer_id, state0); } }, transfer_proof = self.send_transfer_proof.recv().fuse() => { diff --git a/swap/src/protocol/alice/execution_setup.rs b/swap/src/protocol/alice/execution_setup.rs new file mode 100644 index 00000000..967ba26f --- /dev/null +++ b/swap/src/protocol/alice/execution_setup.rs @@ -0,0 +1,98 @@ +use crate::{ + bitcoin, + bitcoin::{EncryptedSignature, Signature}, + monero, + network::request_response::BUF_SIZE, + protocol::{ + alice::{State0, State3}, + bob::{Message0, Message2, Message4}, + }, +}; +use anyhow::{Context, Error, Result}; +use libp2p::PeerId; +use libp2p_async_await::BehaviourOutEvent; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Message1 { + pub(crate) A: bitcoin::PublicKey, + pub(crate) S_a_monero: monero::PublicKey, + pub(crate) S_a_bitcoin: bitcoin::PublicKey, + pub(crate) dleq_proof_s_a: cross_curve_dleq::Proof, + pub(crate) v_a: monero::PrivateViewKey, + pub(crate) redeem_address: bitcoin::Address, + pub(crate) punish_address: bitcoin::Address, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Message3 { + pub(crate) tx_cancel_sig: Signature, + pub(crate) tx_refund_encsig: EncryptedSignature, +} + +#[derive(Debug)] +pub enum OutEvent { + Done(Result), +} + +impl From> for OutEvent { + fn from(event: BehaviourOutEvent) -> Self { + match event { + BehaviourOutEvent::Inbound(_, Ok(State3)) => OutEvent::Done(Ok(State3)), + BehaviourOutEvent::Inbound(_, Err(e)) => OutEvent::Done(Err(e)), + BehaviourOutEvent::Outbound(..) => unreachable!("Alice only supports inbound"), + } + } +} + +#[derive(libp2p::NetworkBehaviour)] +#[behaviour(out_event = "OutEvent", event_process = false)] +pub struct Behaviour { + inner: libp2p_async_await::Behaviour, +} + +impl Default for Behaviour { + fn default() -> Self { + Self { + inner: libp2p_async_await::Behaviour::new(b"/comit/xmr/btc/execution_setup/1.0.0"), + } + } +} + +impl Behaviour { + pub fn run(&mut self, bob: PeerId, state0: State0) { + self.inner + .do_protocol_listener(bob, move |mut substream| async move { + let message0 = + serde_cbor::from_slice::(&substream.read_message(BUF_SIZE).await?) + .context("failed to deserialize message0")?; + let state1 = state0.receive(message0)?; + + substream + .write_message( + &serde_cbor::to_vec(&state1.next_message()) + .context("failed to serialize message1")?, + ) + .await?; + + let message2 = + serde_cbor::from_slice::(&substream.read_message(BUF_SIZE).await?) + .context("failed to deserialize message2")?; + let state2 = state1.receive(message2); + + substream + .write_message( + &serde_cbor::to_vec(&state2.next_message()) + .context("failed to serialize message3")?, + ) + .await?; + + let message4 = + serde_cbor::from_slice::(&substream.read_message(BUF_SIZE).await?) + .context("failed to deserialize message4")?; + let state3 = state2.receive(message4)?; + + Ok(state3) + }) + } +} diff --git a/swap/src/protocol/alice/message0.rs b/swap/src/protocol/alice/message0.rs deleted file mode 100644 index a2c42304..00000000 --- a/swap/src/protocol/alice/message0.rs +++ /dev/null @@ -1,119 +0,0 @@ -use crate::{ - bitcoin, monero, - network::request_response::{AliceToBob, BobToAlice, Codec, Message0Protocol, TIMEOUT}, - protocol::bob, -}; -use anyhow::{anyhow, Result}; -use libp2p::{ - request_response::{ - handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig, - RequestResponseEvent, RequestResponseMessage, ResponseChannel, - }, - swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}, - NetworkBehaviour, -}; -use serde::{Deserialize, Serialize}; -use std::{ - collections::VecDeque, - task::{Context, Poll}, - time::Duration, -}; -use tracing::{debug, error}; - -#[derive(Debug)] -pub enum OutEvent { - Msg { - msg: bob::Message0, - channel: ResponseChannel, - }, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct Message0 { - pub(crate) A: bitcoin::PublicKey, - pub(crate) S_a_monero: monero::PublicKey, - pub(crate) S_a_bitcoin: bitcoin::PublicKey, - pub(crate) dleq_proof_s_a: cross_curve_dleq::Proof, - pub(crate) v_a: monero::PrivateViewKey, - pub(crate) redeem_address: bitcoin::Address, - pub(crate) punish_address: bitcoin::Address, -} - -/// A `NetworkBehaviour` that represents send/recv of message 0. -#[derive(NetworkBehaviour)] -#[behaviour(out_event = "OutEvent", poll_method = "poll")] -#[allow(missing_debug_implementations)] -pub struct Behaviour { - rr: RequestResponse>, - #[behaviour(ignore)] - events: VecDeque, -} - -impl Behaviour { - pub fn send(&mut self, channel: ResponseChannel, msg: Message0) -> Result<()> { - let msg = AliceToBob::Message0(Box::new(msg)); - self.rr - .send_response(channel, msg) - .map_err(|alice_to_bob| anyhow!("Could not send response {:?}", alice_to_bob)) - } - fn poll( - &mut self, - _: &mut Context<'_>, - _: &mut impl PollParameters, - ) -> Poll>, OutEvent>> { - if let Some(event) = self.events.pop_front() { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); - } - - Poll::Pending - } -} - -impl Default for Behaviour { - fn default() -> Self { - let timeout = Duration::from_secs(TIMEOUT); - let mut config = RequestResponseConfig::default(); - config.set_request_timeout(timeout); - - Self { - rr: RequestResponse::new( - Codec::default(), - vec![(Message0Protocol, ProtocolSupport::Full)], - config, - ), - events: Default::default(), - } - } -} - -impl NetworkBehaviourEventProcess> for Behaviour { - fn inject_event(&mut self, event: RequestResponseEvent) { - match event { - RequestResponseEvent::Message { - message: - RequestResponseMessage::Request { - request, channel, .. - }, - .. - } => { - if let BobToAlice::Message0(msg) = request { - debug!("Received Message0"); - self.events.push_back(OutEvent::Msg { msg: *msg, channel }); - } - } - RequestResponseEvent::Message { - message: RequestResponseMessage::Response { .. }, - .. - } => panic!("Alice should not get a Response"), - RequestResponseEvent::InboundFailure { error, .. } => { - error!("Inbound failure: {:?}", error); - } - RequestResponseEvent::OutboundFailure { error, .. } => { - error!("Outbound failure: {:?}", error); - } - RequestResponseEvent::ResponseSent { .. } => { - debug!("Alice has sent Message0 as response to Bob"); - } - } - } -} diff --git a/swap/src/protocol/alice/message1.rs b/swap/src/protocol/alice/message1.rs deleted file mode 100644 index 94992b26..00000000 --- a/swap/src/protocol/alice/message1.rs +++ /dev/null @@ -1,117 +0,0 @@ -use crate::{ - network::request_response::{AliceToBob, BobToAlice, Codec, Message1Protocol, TIMEOUT}, - protocol::bob, -}; -use anyhow::{anyhow, Result}; -use ecdsa_fun::{adaptor::EncryptedSignature, Signature}; -use libp2p::{ - request_response::{ - handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig, - RequestResponseEvent, RequestResponseMessage, ResponseChannel, - }, - swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}, - NetworkBehaviour, -}; -use serde::{Deserialize, Serialize}; -use std::{ - collections::VecDeque, - task::{Context, Poll}, - time::Duration, -}; -use tracing::{debug, error}; - -#[derive(Debug)] -pub enum OutEvent { - Msg { - /// Received message from Bob. - msg: bob::Message1, - /// Channel to send back Alice's message 1. - channel: ResponseChannel, - }, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct Message1 { - pub(crate) tx_cancel_sig: Signature, - pub(crate) tx_refund_encsig: EncryptedSignature, -} - -/// A `NetworkBehaviour` that represents send/recv of message 1. -#[derive(NetworkBehaviour)] -#[behaviour(out_event = "OutEvent", poll_method = "poll")] -#[allow(missing_debug_implementations)] -pub struct Behaviour { - rr: RequestResponse>, - #[behaviour(ignore)] - events: VecDeque, -} - -impl Behaviour { - pub fn send(&mut self, channel: ResponseChannel, msg: Message1) -> Result<()> { - let msg = AliceToBob::Message1(Box::new(msg)); - self.rr - .send_response(channel, msg) - .map_err(|alice_to_bob| anyhow!("Could not send response {:?}", alice_to_bob)) - } - - fn poll( - &mut self, - _: &mut Context<'_>, - _: &mut impl PollParameters, - ) -> Poll>, OutEvent>> { - if let Some(event) = self.events.pop_front() { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); - } - - Poll::Pending - } -} - -impl Default for Behaviour { - fn default() -> Self { - let timeout = Duration::from_secs(TIMEOUT); - let mut config = RequestResponseConfig::default(); - config.set_request_timeout(timeout); - - Self { - rr: RequestResponse::new( - Codec::default(), - vec![(Message1Protocol, ProtocolSupport::Full)], - config, - ), - events: Default::default(), - } - } -} - -impl NetworkBehaviourEventProcess> for Behaviour { - fn inject_event(&mut self, event: RequestResponseEvent) { - match event { - RequestResponseEvent::Message { - message: - RequestResponseMessage::Request { - request, channel, .. - }, - .. - } => { - if let BobToAlice::Message1(msg) = request { - debug!("Received Message1"); - self.events.push_back(OutEvent::Msg { msg: *msg, channel }); - } - } - RequestResponseEvent::Message { - message: RequestResponseMessage::Response { .. }, - .. - } => panic!("Alice should not get a Response"), - RequestResponseEvent::InboundFailure { error, .. } => { - error!("Inbound failure: {:?}", error); - } - RequestResponseEvent::OutboundFailure { error, .. } => { - error!("Outbound failure: {:?}", error); - } - RequestResponseEvent::ResponseSent { .. } => { - debug!("Alice has sent an Message1 response to Bob"); - } - } - } -} diff --git a/swap/src/protocol/alice/message2.rs b/swap/src/protocol/alice/message2.rs deleted file mode 100644 index b89065e3..00000000 --- a/swap/src/protocol/alice/message2.rs +++ /dev/null @@ -1,104 +0,0 @@ -use crate::{ - network::request_response::{AliceToBob, BobToAlice, Codec, Message2Protocol, TIMEOUT}, - protocol::bob, -}; -use libp2p::{ - request_response::{ - handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig, - RequestResponseEvent, RequestResponseMessage, - }, - swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}, - NetworkBehaviour, PeerId, -}; -use std::{ - collections::VecDeque, - task::{Context, Poll}, - time::Duration, -}; -use tracing::{debug, error}; - -#[derive(Debug)] -pub enum OutEvent { - Msg { - msg: bob::Message2, - bob_peer_id: PeerId, - }, -} - -/// A `NetworkBehaviour` that represents receiving of message 2 from Bob. -#[derive(NetworkBehaviour)] -#[behaviour(out_event = "OutEvent", poll_method = "poll")] -#[allow(missing_debug_implementations)] -pub struct Behaviour { - rr: RequestResponse>, - #[behaviour(ignore)] - events: VecDeque, -} - -impl Behaviour { - fn poll( - &mut self, - _: &mut Context<'_>, - _: &mut impl PollParameters, - ) -> Poll>, OutEvent>> { - if let Some(event) = self.events.pop_front() { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); - } - - Poll::Pending - } -} - -impl Default for Behaviour { - fn default() -> Self { - let timeout = Duration::from_secs(TIMEOUT); - let mut config = RequestResponseConfig::default(); - config.set_request_timeout(timeout); - - Self { - rr: RequestResponse::new( - Codec::default(), - vec![(Message2Protocol, ProtocolSupport::Full)], - config, - ), - events: Default::default(), - } - } -} - -impl NetworkBehaviourEventProcess> for Behaviour { - fn inject_event(&mut self, event: RequestResponseEvent) { - match event { - RequestResponseEvent::Message { - peer, - message: - RequestResponseMessage::Request { - request, channel, .. - }, - } => { - if let BobToAlice::Message2(msg) = request { - debug!("Received Message 2"); - self.events.push_back(OutEvent::Msg { - msg: *msg, - bob_peer_id: peer, - }); - // Send back empty response so that the request/response protocol completes. - let _ = self.rr.send_response(channel, AliceToBob::Message2); - } - } - RequestResponseEvent::Message { - message: RequestResponseMessage::Response { .. }, - .. - } => panic!("Alice should not get a Response"), - RequestResponseEvent::InboundFailure { error, .. } => { - error!("Inbound failure: {:?}", error); - } - RequestResponseEvent::OutboundFailure { error, .. } => { - error!("Outbound failure: {:?}", error); - } - RequestResponseEvent::ResponseSent { .. } => { - debug!("Alice has sent a Message2 response to Bob"); - } - } - } -} diff --git a/swap/src/protocol/alice/state.rs b/swap/src/protocol/alice/state.rs index 6fcee3b6..a6bbf21f 100644 --- a/swap/src/protocol/alice/state.rs +++ b/swap/src/protocol/alice/state.rs @@ -7,7 +7,11 @@ use crate::{ TxRefund, WatchForRawTransaction, }, monero, - protocol::{alice, alice::TransferProof, bob, bob::EncryptedSignature, SwapAmounts}, + protocol::{ + alice::{Message1, Message3, TransferProof}, + bob::{EncryptedSignature, Message0, Message2, Message4}, + SwapAmounts, + }, }; use anyhow::{anyhow, Context, Result}; use ecdsa_fun::{adaptor::Adaptor, nonce::Deterministic}; @@ -16,7 +20,6 @@ use rand::{CryptoRng, RngCore}; use serde::{Deserialize, Serialize}; use sha2::Sha256; use std::fmt; -use tracing::info; #[derive(Debug)] pub enum AliceState { @@ -87,6 +90,7 @@ pub struct State0 { pub a: bitcoin::SecretKey, pub s_a: cross_curve_dleq::Scalar, pub v_a: monero::PrivateViewKey, + pub dleq_proof_s_a: cross_curve_dleq::Proof, #[serde(with = "::bitcoin::util::amount::serde::as_sat")] pub btc: bitcoin::Amount, pub xmr: monero::Amount, @@ -98,7 +102,7 @@ pub struct State0 { impl State0 { #[allow(clippy::too_many_arguments)] - pub fn new( + pub fn new( a: bitcoin::SecretKey, s_a: cross_curve_dleq::Scalar, v_a: monero::PrivateViewKey, @@ -108,11 +112,18 @@ impl State0 { punish_timelock: Timelock, redeem_address: bitcoin::Address, punish_address: bitcoin::Address, - ) -> Self { + rng: &mut R, + ) -> Self + where + R: RngCore + CryptoRng, + { + let dleq_proof_s_a = cross_curve_dleq::Proof::new(rng, &s_a); + Self { a, s_a, v_a, + dleq_proof_s_a, redeem_address, punish_address, btc, @@ -122,24 +133,7 @@ impl State0 { } } - pub fn next_message(&self, rng: &mut R) -> alice::Message0 { - info!("Producing first message"); - let dleq_proof_s_a = cross_curve_dleq::Proof::new(rng, &self.s_a); - - alice::Message0 { - A: self.a.public(), - S_a_monero: monero::PublicKey::from_private_key(&monero::PrivateKey { - scalar: self.s_a.into_ed25519(), - }), - S_a_bitcoin: self.s_a.into_secp256k1().into(), - dleq_proof_s_a, - v_a: self.v_a, - redeem_address: self.redeem_address.clone(), - punish_address: self.punish_address.clone(), - } - } - - pub fn receive(self, msg: bob::Message0) -> Result { + pub fn receive(self, msg: Message0) -> Result { msg.dleq_proof_s_b.verify( msg.S_b_bitcoin.clone().into(), msg.S_b_monero @@ -157,6 +151,8 @@ impl State0 { S_b_monero: msg.S_b_monero, S_b_bitcoin: msg.S_b_bitcoin, v, + v_a: self.v_a, + dleq_proof_s_a: self.dleq_proof_s_a, btc: self.btc, xmr: self.xmr, cancel_timelock: self.cancel_timelock, @@ -176,6 +172,8 @@ pub struct State1 { S_b_monero: monero::PublicKey, S_b_bitcoin: bitcoin::PublicKey, v: monero::PrivateViewKey, + v_a: monero::PrivateViewKey, + dleq_proof_s_a: cross_curve_dleq::Proof, #[serde(with = "::bitcoin::util::amount::serde::as_sat")] btc: bitcoin::Amount, xmr: monero::Amount, @@ -187,7 +185,21 @@ pub struct State1 { } impl State1 { - pub fn receive(self, msg: bob::Message1) -> State2 { + pub fn next_message(&self) -> Message1 { + Message1 { + A: self.a.public(), + S_a_monero: monero::PublicKey::from_private_key(&monero::PrivateKey { + scalar: self.s_a.into_ed25519(), + }), + S_a_bitcoin: self.s_a.into_secp256k1().into(), + dleq_proof_s_a: self.dleq_proof_s_a.clone(), + v_a: self.v_a, + redeem_address: self.redeem_address.clone(), + punish_address: self.punish_address.clone(), + } + } + + pub fn receive(self, msg: Message2) -> State2 { State2 { a: self.a, B: self.B, @@ -227,7 +239,7 @@ pub struct State2 { } impl State2 { - pub fn next_message(&self) -> alice::Message1 { + pub fn next_message(&self) -> Message3 { let tx_cancel = bitcoin::TxCancel::new(&self.tx_lock, self.cancel_timelock, self.a.public(), self.B); @@ -240,13 +252,13 @@ impl State2 { let tx_refund_encsig = self.a.encsign(self.S_b_bitcoin, tx_refund.digest()); let tx_cancel_sig = self.a.sign(tx_cancel.digest()); - alice::Message1 { + Message3 { tx_refund_encsig, tx_cancel_sig, } } - pub fn receive(self, msg: bob::Message2) -> Result { + pub fn receive(self, msg: Message4) -> Result { let tx_cancel = bitcoin::TxCancel::new(&self.tx_lock, self.cancel_timelock, self.a.public(), self.B); bitcoin::verify_sig(&self.B, &tx_cancel.digest(), &msg.tx_cancel_sig) diff --git a/swap/src/protocol/alice/steps.rs b/swap/src/protocol/alice/steps.rs index 86b96b5c..d07b72d7 100644 --- a/swap/src/protocol/alice/steps.rs +++ b/swap/src/protocol/alice/steps.rs @@ -23,7 +23,6 @@ use futures::{ pin_mut, }; use libp2p::PeerId; -use rand::rngs::OsRng; use sha2::Sha256; use std::sync::Arc; use tokio::time::timeout; @@ -47,48 +46,21 @@ pub async fn negotiate( let event = timeout( execution_params.bob_time_to_act, - event_loop_handle.recv_request(), + event_loop_handle.recv_swap_request(), ) .await .context("Failed to receive swap request from Bob")??; event_loop_handle - .send_swap_response(event.channel, SwapResponse { xmr_amount }) + .send_swap_response(event.1, SwapResponse { xmr_amount }) .await?; - let (bob_message0, channel) = timeout( + let state3 = timeout( execution_params.bob_time_to_act, - event_loop_handle.recv_message0(), + event_loop_handle.execution_setup(bob_peer_id, state0), ) .await??; - let alice_message0 = state0.next_message(&mut OsRng); - event_loop_handle - .send_message0(channel, alice_message0) - .await?; - - let state1 = state0.receive(bob_message0)?; - - let (bob_message1, channel) = timeout( - execution_params.bob_time_to_act, - event_loop_handle.recv_message1(), - ) - .await??; - - let state2 = state1.receive(bob_message1); - - event_loop_handle - .send_message1(channel, state2.next_message()) - .await?; - - let bob_message2 = timeout( - execution_params.bob_time_to_act, - event_loop_handle.recv_message2(), - ) - .await??; - - let state3 = state2.receive(bob_message2)?; - Ok((bob_peer_id, state3)) } diff --git a/swap/src/protocol/alice/swap.rs b/swap/src/protocol/alice/swap.rs index 0cbcac28..beb17e91 100644 --- a/swap/src/protocol/alice/swap.rs +++ b/swap/src/protocol/alice/swap.rs @@ -247,9 +247,7 @@ async fn run_until_internal( .await; match publishded_redeem_tx { - Ok(_) => { - AliceState::BtcRedeemed - } + Ok(_) => AliceState::BtcRedeemed, Err(e) => { bail!("Waiting for Bitcoin transaction finality failed with {}! The redeem transaction was published, but it is not ensured that the transaction was included! You're screwed.", e) } diff --git a/swap/src/protocol/alice/swap_response.rs b/swap/src/protocol/alice/swap_response.rs index 62c1626c..69138642 100644 --- a/swap/src/protocol/alice/swap_response.rs +++ b/swap/src/protocol/alice/swap_response.rs @@ -1,29 +1,28 @@ use crate::{ monero, - network::request_response::{AliceToBob, BobToAlice, Codec, Swap, TIMEOUT}, - protocol::bob, + network::request_response::{CborCodec, Swap, TIMEOUT}, + protocol::bob::SwapRequest, }; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, Error, Result}; use libp2p::{ request_response::{ - handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig, - RequestResponseEvent, RequestResponseMessage, ResponseChannel, + ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent, + RequestResponseMessage, ResponseChannel, }, - swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}, NetworkBehaviour, }; use serde::{Deserialize, Serialize}; -use std::{ - collections::VecDeque, - task::{Context, Poll}, - time::Duration, -}; -use tracing::{debug, error}; +use std::time::Duration; +use tracing::debug; #[derive(Debug)] -pub struct OutEvent { - pub msg: bob::SwapRequest, - pub channel: ResponseChannel, +pub enum OutEvent { + MsgReceived { + msg: SwapRequest, + channel: ResponseChannel, + }, + ResponseSent, + Failure(Error), } #[derive(Copy, Clone, Debug, Serialize, Deserialize)] @@ -31,37 +30,58 @@ pub struct SwapResponse { pub xmr_amount: monero::Amount, } +impl From> for OutEvent { + fn from(event: RequestResponseEvent) -> Self { + match event { + RequestResponseEvent::Message { + peer, + message: + RequestResponseMessage::Request { + request, channel, .. + }, + .. + } => { + debug!("Received swap request from {}", peer); + OutEvent::MsgReceived { + msg: request, + channel, + } + } + RequestResponseEvent::Message { + message: RequestResponseMessage::Response { .. }, + .. + } => OutEvent::Failure(anyhow!("Alice should not get a Response")), + RequestResponseEvent::InboundFailure { error, .. } => { + OutEvent::Failure(anyhow!("Inbound failure: {:?}", error)) + } + RequestResponseEvent::OutboundFailure { error, .. } => { + OutEvent::Failure(anyhow!("Outbound failure: {:?}", error)) + } + RequestResponseEvent::ResponseSent { .. } => OutEvent::ResponseSent, + } + } +} + /// A `NetworkBehaviour` that represents negotiate a swap using Swap /// request/response. #[derive(NetworkBehaviour)] -#[behaviour(out_event = "OutEvent", poll_method = "poll")] +#[behaviour(out_event = "OutEvent", event_process = false)] #[allow(missing_debug_implementations)] pub struct Behaviour { - rr: RequestResponse>, - #[behaviour(ignore)] - events: VecDeque, + rr: RequestResponse>, } impl Behaviour { /// Alice always sends her messages as a response to a request from Bob. - pub fn send(&mut self, channel: ResponseChannel, msg: SwapResponse) -> Result<()> { - let msg = AliceToBob::SwapResponse(Box::new(msg)); + pub fn send( + &mut self, + channel: ResponseChannel, + msg: SwapResponse, + ) -> Result<()> { self.rr .send_response(channel, msg) .map_err(|_| anyhow!("Sending swap response failed")) } - - fn poll( - &mut self, - _: &mut Context<'_>, - _: &mut impl PollParameters, - ) -> Poll>, OutEvent>> { - if let Some(event) = self.events.pop_front() { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); - } - - Poll::Pending - } } impl Default for Behaviour { @@ -73,43 +93,10 @@ impl Default for Behaviour { Self { rr: RequestResponse::new( - Codec::default(), - vec![(Swap, ProtocolSupport::Full)], + CborCodec::default(), + vec![(Swap, ProtocolSupport::Inbound)], config, ), - events: Default::default(), - } - } -} - -impl NetworkBehaviourEventProcess> for Behaviour { - fn inject_event(&mut self, event: RequestResponseEvent) { - match event { - RequestResponseEvent::Message { - message: - RequestResponseMessage::Request { - request, channel, .. - }, - .. - } => { - if let BobToAlice::SwapRequest(msg) = request { - debug!("Received swap request"); - self.events.push_back(OutEvent { msg: *msg, channel }) - } - } - RequestResponseEvent::Message { - message: RequestResponseMessage::Response { .. }, - .. - } => panic!("Alice should not get a Response"), - RequestResponseEvent::InboundFailure { error, .. } => { - error!("Inbound failure: {:?}", error); - } - RequestResponseEvent::OutboundFailure { error, .. } => { - error!("Outbound failure: {:?}", error); - } - RequestResponseEvent::ResponseSent { .. } => { - debug!("Alice has sent a swap response to Bob"); - } } } } diff --git a/swap/src/protocol/alice/transfer_proof.rs b/swap/src/protocol/alice/transfer_proof.rs index 422fa782..d7c06cdc 100644 --- a/swap/src/protocol/alice/transfer_proof.rs +++ b/swap/src/protocol/alice/transfer_proof.rs @@ -1,62 +1,42 @@ use crate::{ monero, - network::request_response::{OneShotCodec, Request, Response, TransferProofProtocol, TIMEOUT}, + network::request_response::{CborCodec, TransferProofProtocol, TIMEOUT}, }; +use anyhow::{anyhow, Error}; use libp2p::{ request_response::{ - handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig, - RequestResponseEvent, RequestResponseMessage, + ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent, + RequestResponseMessage, }, - swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}, NetworkBehaviour, PeerId, }; use serde::{Deserialize, Serialize}; -use std::{ - collections::VecDeque, - task::{Context, Poll}, - time::Duration, -}; -use tracing::error; +use std::time::Duration; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct TransferProof { pub tx_lock_proof: monero::TransferProof, } -#[derive(Debug, Copy, Clone)] +#[derive(Debug)] pub enum OutEvent { Acknowledged, + Failure(Error), } /// A `NetworkBehaviour` that represents sending the Monero transfer proof to /// Bob. #[derive(NetworkBehaviour)] -#[behaviour(out_event = "OutEvent", poll_method = "poll")] +#[behaviour(out_event = "OutEvent", event_process = false)] #[allow(missing_debug_implementations)] pub struct Behaviour { - rr: RequestResponse>, - #[behaviour(ignore)] - events: VecDeque, + rr: RequestResponse>, } impl Behaviour { pub fn send(&mut self, bob: PeerId, msg: TransferProof) { - let msg = Request::TransferProof(Box::new(msg)); let _id = self.rr.send_request(&bob, msg); } - - fn poll( - &mut self, - _: &mut Context<'_>, - _: &mut impl PollParameters, - ) -> Poll>, OutEvent>> - { - if let Some(event) = self.events.pop_front() { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); - } - - Poll::Pending - } } impl Default for Behaviour { @@ -67,37 +47,36 @@ impl Default for Behaviour { Self { rr: RequestResponse::new( - OneShotCodec::default(), + CborCodec::default(), vec![(TransferProofProtocol, ProtocolSupport::Outbound)], config, ), - events: Default::default(), } } } -impl NetworkBehaviourEventProcess> for Behaviour { - fn inject_event(&mut self, event: RequestResponseEvent) { +impl From> for OutEvent { + fn from(event: RequestResponseEvent) -> Self { match event { RequestResponseEvent::Message { message: RequestResponseMessage::Request { .. }, .. - } => panic!("Alice should never get a transfer proof request from Bob"), + } => OutEvent::Failure(anyhow!( + "Alice should never get a transfer proof request from Bob" + )), RequestResponseEvent::Message { - message: RequestResponseMessage::Response { response, .. }, + message: RequestResponseMessage::Response { .. }, .. - } => { - if let Response::TransferProof = response { - self.events.push_back(OutEvent::Acknowledged); - } - } + } => OutEvent::Acknowledged, RequestResponseEvent::InboundFailure { error, .. } => { - error!("Inbound failure: {:?}", error); + OutEvent::Failure(anyhow!("Inbound failure: {:?}", error)) } RequestResponseEvent::OutboundFailure { error, .. } => { - error!("Outbound failure: {:?}", error); + OutEvent::Failure(anyhow!("Outbound failure: {:?}", error)) + } + RequestResponseEvent::ResponseSent { .. } => { + OutEvent::Failure(anyhow!("Alice should not send a response")) } - RequestResponseEvent::ResponseSent { .. } => {} } } } diff --git a/swap/src/protocol/bob.rs b/swap/src/protocol/bob.rs index 810af8c2..94afa7fa 100644 --- a/swap/src/protocol/bob.rs +++ b/swap/src/protocol/bob.rs @@ -3,15 +3,16 @@ use crate::{ bitcoin, database, database::Database, + execution_params::ExecutionParams, monero, network, network::{ peer_tracker::{self, PeerTracker}, transport::build, }, - protocol::{alice, bob, SwapAmounts}, + protocol::{alice, alice::TransferProof, bob, SwapAmounts}, seed::Seed, }; -use anyhow::{bail, Result}; +use anyhow::{bail, Error, Result}; use libp2p::{core::Multiaddr, identity::Keypair, NetworkBehaviour, PeerId}; use rand::rngs::OsRng; use std::{path::PathBuf, sync::Arc}; @@ -21,20 +22,16 @@ use uuid::Uuid; pub use self::{ encrypted_signature::EncryptedSignature, event_loop::{EventLoop, EventLoopHandle}, - message0::Message0, - message1::Message1, - message2::Message2, state::*, swap::{run, run_until}, swap_request::*, }; -use crate::{execution_params::ExecutionParams, protocol::alice::TransferProof}; +pub use execution_setup::{Message0, Message2, Message4}; +use libp2p::request_response::ResponseChannel; mod encrypted_signature; pub mod event_loop; -mod message0; -mod message1; -mod message2; +mod execution_setup; pub mod state; pub mod swap; mod swap_request; @@ -162,6 +159,7 @@ impl Builder { } } } + fn init_event_loop( &self, ) -> Result<(bob::event_loop::EventLoop, bob::event_loop::EventLoopHandle)> { @@ -174,6 +172,7 @@ impl Builder { self.peer_id, self.alice_peer_id, self.alice_address.clone(), + self.bitcoin_wallet.clone(), ) } @@ -203,15 +202,18 @@ impl Builder { } } -#[derive(Debug, Clone)] +#[derive(Debug)] pub enum OutEvent { ConnectionEstablished(PeerId), SwapResponse(alice::SwapResponse), - Message0(Box), - Message1(Box), - Message2, - TransferProof(Box), + ExecutionSetupDone(Result>), + TransferProof { + msg: Box, + channel: ResponseChannel<()>, + }, EncryptedSignatureAcknowledged, + ResponseSent, // Same variant is used for all messages as no processing is done + Failure(Error), } impl From for OutEvent { @@ -226,46 +228,42 @@ impl From for OutEvent { impl From for OutEvent { fn from(event: swap_request::OutEvent) -> Self { - OutEvent::SwapResponse(event.swap_response) - } -} - -impl From for OutEvent { - fn from(event: message0::OutEvent) -> Self { + use swap_request::OutEvent::*; match event { - message0::OutEvent::Msg(msg) => OutEvent::Message0(Box::new(msg)), + MsgReceived(swap_response) => OutEvent::SwapResponse(swap_response), + Failure(err) => OutEvent::Failure(err.context("Failre with Swap Request")), } } } -impl From for OutEvent { - fn from(event: message1::OutEvent) -> Self { +impl From for OutEvent { + fn from(event: execution_setup::OutEvent) -> Self { match event { - message1::OutEvent::Msg(msg) => OutEvent::Message1(Box::new(msg)), - } - } -} - -impl From for OutEvent { - fn from(event: message2::OutEvent) -> Self { - match event { - message2::OutEvent::Msg => OutEvent::Message2, + execution_setup::OutEvent::Done(res) => OutEvent::ExecutionSetupDone(res.map(Box::new)), } } } impl From for OutEvent { fn from(event: transfer_proof::OutEvent) -> Self { + use transfer_proof::OutEvent::*; match event { - transfer_proof::OutEvent::Msg(msg) => OutEvent::TransferProof(Box::new(msg)), + MsgReceived { msg, channel } => OutEvent::TransferProof { + msg: Box::new(msg), + channel, + }, + AckSent => OutEvent::ResponseSent, + Failure(err) => OutEvent::Failure(err.context("Failure with Transfer Proof")), } } } impl From for OutEvent { fn from(event: encrypted_signature::OutEvent) -> Self { + use encrypted_signature::OutEvent::*; match event { - encrypted_signature::OutEvent::Acknowledged => OutEvent::EncryptedSignatureAcknowledged, + Acknowledged => OutEvent::EncryptedSignatureAcknowledged, + Failure(err) => OutEvent::Failure(err.context("Failure with Encrypted Signature")), } } } @@ -277,9 +275,7 @@ impl From for OutEvent { pub struct Behaviour { pt: PeerTracker, swap_request: swap_request::Behaviour, - message0: message0::Behaviour, - message1: message1::Behaviour, - message2: message2::Behaviour, + execution_setup: execution_setup::Behaviour, transfer_proof: transfer_proof::Behaviour, encrypted_signature: encrypted_signature::Behaviour, } @@ -291,22 +287,15 @@ impl Behaviour { info!("Requesting swap from: {}", alice); } - /// Sends Bob's first message to Alice. - pub fn send_message0(&mut self, alice: PeerId, msg: bob::Message0) { - self.message0.send(alice, msg); - debug!("Message0 sent"); - } - - /// Sends Bob's second message to Alice. - pub fn send_message1(&mut self, alice: PeerId, msg: bob::Message1) { - self.message1.send(alice, msg); - debug!("Message1 sent"); - } - - /// Sends Bob's third message to Alice. - pub fn send_message2(&mut self, alice: PeerId, msg: bob::Message2) { - self.message2.send(alice, msg); - debug!("Message2 sent"); + pub fn start_execution_setup( + &mut self, + alice_peer_id: PeerId, + state0: State0, + bitcoin_wallet: Arc, + ) { + self.execution_setup + .run(alice_peer_id, state0, bitcoin_wallet); + info!("Start execution setup with {}", alice_peer_id); } /// Sends Bob's fourth message to Alice. diff --git a/swap/src/protocol/bob/encrypted_signature.rs b/swap/src/protocol/bob/encrypted_signature.rs index cf167f36..96a253a2 100644 --- a/swap/src/protocol/bob/encrypted_signature.rs +++ b/swap/src/protocol/bob/encrypted_signature.rs @@ -1,61 +1,38 @@ -use crate::network::request_response::{ - EncryptedSignatureProtocol, OneShotCodec, Request, Response, TIMEOUT, -}; +use crate::network::request_response::{CborCodec, EncryptedSignatureProtocol, TIMEOUT}; +use anyhow::{anyhow, Error}; use libp2p::{ request_response::{ - handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig, - RequestResponseEvent, RequestResponseMessage, + ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent, + RequestResponseMessage, }, - swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}, NetworkBehaviour, PeerId, }; use serde::{Deserialize, Serialize}; -use std::{ - collections::VecDeque, - task::{Context, Poll}, - time::Duration, -}; -use tracing::error; +use std::time::Duration; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct EncryptedSignature { pub tx_redeem_encsig: crate::bitcoin::EncryptedSignature, } -#[derive(Debug, Copy, Clone)] +#[derive(Debug)] pub enum OutEvent { Acknowledged, + Failure(Error), } /// A `NetworkBehaviour` that represents sending encrypted signature to Alice. #[derive(NetworkBehaviour)] -#[behaviour(out_event = "OutEvent", poll_method = "poll")] +#[behaviour(out_event = "OutEvent", event_process = false)] #[allow(missing_debug_implementations)] pub struct Behaviour { - rr: RequestResponse>, - #[behaviour(ignore)] - events: VecDeque, + rr: RequestResponse>, } impl Behaviour { pub fn send(&mut self, alice: PeerId, msg: EncryptedSignature) { - let msg = Request::EncryptedSignature(Box::new(msg)); let _id = self.rr.send_request(&alice, msg); } - - fn poll( - &mut self, - _: &mut Context<'_>, - _: &mut impl PollParameters, - ) -> Poll< - NetworkBehaviourAction>, OutEvent>, - > { - if let Some(event) = self.events.pop_front() { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); - } - - Poll::Pending - } } impl Default for Behaviour { @@ -66,39 +43,34 @@ impl Default for Behaviour { Self { rr: RequestResponse::new( - OneShotCodec::default(), + CborCodec::default(), vec![(EncryptedSignatureProtocol, ProtocolSupport::Outbound)], config, ), - events: Default::default(), } } } -impl NetworkBehaviourEventProcess> for Behaviour { - fn inject_event(&mut self, event: RequestResponseEvent) { +impl From> for OutEvent { + fn from(event: RequestResponseEvent) -> Self { match event { RequestResponseEvent::Message { message: RequestResponseMessage::Request { .. }, .. - } => panic!("Bob should never get a request from Alice"), + } => OutEvent::Failure(anyhow!("Bob should never get a request from Alice")), RequestResponseEvent::Message { - message: RequestResponseMessage::Response { response, .. }, + message: RequestResponseMessage::Response { .. }, .. - } => { - if let Response::EncryptedSignature = response { - self.events.push_back(OutEvent::Acknowledged); - } - } + } => OutEvent::Acknowledged, RequestResponseEvent::InboundFailure { error, .. } => { - error!("Inbound failure: {:?}", error); + OutEvent::Failure(anyhow!("Inbound failure: {:?}", error)) } RequestResponseEvent::OutboundFailure { error, .. } => { - error!("Outbound failure: {:?}", error); - } - RequestResponseEvent::ResponseSent { .. } => { - unreachable!("Bob does not send the encrypted signature response to Alice"); + OutEvent::Failure(anyhow!("Outbound failure: {:?}", error)) } + RequestResponseEvent::ResponseSent { .. } => OutEvent::Failure(anyhow!( + "Bob does not send the encrypted signature response to Alice" + )), } } } diff --git a/swap/src/protocol/bob/event_loop.rs b/swap/src/protocol/bob/event_loop.rs index 86f8bac6..4d8229d6 100644 --- a/swap/src/protocol/bob/event_loop.rs +++ b/swap/src/protocol/bob/event_loop.rs @@ -1,15 +1,16 @@ use crate::{ + bitcoin, bitcoin::EncryptedSignature, network::{transport::SwapTransport, TokioExecutor}, protocol::{ - alice, alice::{SwapResponse, TransferProof}, - bob::{self, Behaviour, OutEvent, SwapRequest}, + bob::{Behaviour, OutEvent, State0, State2, SwapRequest}, }, }; use anyhow::{anyhow, Result}; use futures::FutureExt; use libp2p::{core::Multiaddr, PeerId}; +use std::sync::Arc; use tokio::sync::mpsc::{Receiver, Sender}; use tracing::{debug, error, info}; @@ -35,15 +36,12 @@ impl Default for Channels { #[derive(Debug)] pub struct EventLoopHandle { recv_swap_response: Receiver, - recv_message0: Receiver, - recv_message1: Receiver, + start_execution_setup: Sender, + done_execution_setup: Receiver>, recv_transfer_proof: Receiver, conn_established: Receiver, dial_alice: Sender<()>, send_swap_request: Sender, - send_message0: Sender, - send_message1: Sender, - send_message2: Sender, send_encrypted_signature: Sender, recv_encrypted_signature_ack: Receiver<()>, } @@ -56,18 +54,13 @@ impl EventLoopHandle { .ok_or_else(|| anyhow!("Failed to receive swap response from Alice")) } - pub async fn recv_message0(&mut self) -> Result { - self.recv_message0 - .recv() - .await - .ok_or_else(|| anyhow!("Failed to receive message 0 from Alice")) - } + pub async fn execution_setup(&mut self, state0: State0) -> Result { + let _ = self.start_execution_setup.send(state0).await?; - pub async fn recv_message1(&mut self) -> Result { - self.recv_message1 + self.done_execution_setup .recv() .await - .ok_or_else(|| anyhow!("Failed to receive message 1 from Alice")) + .ok_or_else(|| anyhow!("Failed to setup execution with Alice"))? } pub async fn recv_transfer_proof(&mut self) -> Result { @@ -96,21 +89,6 @@ impl EventLoopHandle { Ok(()) } - pub async fn send_message0(&mut self, msg: bob::Message0) -> Result<()> { - let _ = self.send_message0.send(msg).await?; - Ok(()) - } - - pub async fn send_message1(&mut self, msg: bob::Message1) -> Result<()> { - let _ = self.send_message1.send(msg).await?; - Ok(()) - } - - pub async fn send_message2(&mut self, msg: bob::Message2) -> Result<()> { - let _ = self.send_message2.send(msg).await?; - Ok(()) - } - pub async fn send_encrypted_signature( &mut self, tx_redeem_encsig: EncryptedSignature, @@ -128,17 +106,15 @@ impl EventLoopHandle { #[allow(missing_debug_implementations)] pub struct EventLoop { swarm: libp2p::Swarm, + bitcoin_wallet: Arc, alice_peer_id: PeerId, recv_swap_response: Sender, - recv_message0: Sender, - recv_message1: Sender, + start_execution_setup: Receiver, + done_execution_setup: Sender>, recv_transfer_proof: Sender, dial_alice: Receiver<()>, conn_established: Sender, send_swap_request: Receiver, - send_message0: Receiver, - send_message1: Receiver, - send_message2: Receiver, send_encrypted_signature: Receiver, recv_encrypted_signature_ack: Sender<()>, } @@ -150,6 +126,7 @@ impl EventLoop { peer_id: PeerId, alice_peer_id: PeerId, alice_addr: Multiaddr, + bitcoin_wallet: Arc, ) -> Result<(Self, EventLoopHandle)> { let mut swarm = libp2p::swarm::SwarmBuilder::new(transport, behaviour, peer_id) .executor(Box::new(TokioExecutor { @@ -160,46 +137,38 @@ impl EventLoop { swarm.add_address(alice_peer_id, alice_addr); let swap_response = Channels::new(); - let recv_message0 = Channels::new(); - let recv_message1 = Channels::new(); + let start_execution_setup = Channels::new(); + let done_execution_setup = Channels::new(); let recv_transfer_proof = Channels::new(); let dial_alice = Channels::new(); let conn_established = Channels::new(); let send_swap_request = Channels::new(); - let send_message0 = Channels::new(); - let send_message1 = Channels::new(); - let send_message2 = Channels::new(); let send_encrypted_signature = Channels::new(); let recv_encrypted_signature_ack = Channels::new(); let event_loop = EventLoop { swarm, alice_peer_id, + bitcoin_wallet, recv_swap_response: swap_response.sender, - recv_message0: recv_message0.sender, - recv_message1: recv_message1.sender, + start_execution_setup: start_execution_setup.receiver, + done_execution_setup: done_execution_setup.sender, recv_transfer_proof: recv_transfer_proof.sender, conn_established: conn_established.sender, dial_alice: dial_alice.receiver, send_swap_request: send_swap_request.receiver, - send_message0: send_message0.receiver, - send_message1: send_message1.receiver, - send_message2: send_message2.receiver, send_encrypted_signature: send_encrypted_signature.receiver, recv_encrypted_signature_ack: recv_encrypted_signature_ack.sender, }; let handle = EventLoopHandle { recv_swap_response: swap_response.receiver, - recv_message0: recv_message0.receiver, - recv_message1: recv_message1.receiver, + start_execution_setup: start_execution_setup.sender, + done_execution_setup: done_execution_setup.receiver, recv_transfer_proof: recv_transfer_proof.receiver, conn_established: conn_established.receiver, dial_alice: dial_alice.sender, send_swap_request: send_swap_request.sender, - send_message0: send_message0.sender, - send_message1: send_message1.sender, - send_message2: send_message2.sender, send_encrypted_signature: send_encrypted_signature.sender, recv_encrypted_signature_ack: recv_encrypted_signature_ack.receiver, }; @@ -218,20 +187,24 @@ impl EventLoop { OutEvent::SwapResponse(msg) => { let _ = self.recv_swap_response.send(msg).await; }, - OutEvent::Message0(msg) => { - let _ = self.recv_message0.send(*msg).await; + OutEvent::ExecutionSetupDone(res) => { + let _ = self.done_execution_setup.send(res.map(|state|*state)).await; } - OutEvent::Message1(msg) => { - let _ = self.recv_message1.send(*msg).await; - } - OutEvent::Message2 => info!("Alice acknowledged message 2 received"), - OutEvent::TransferProof(msg) => { + OutEvent::TransferProof{ msg, channel }=> { let _ = self.recv_transfer_proof.send(*msg).await; + // Send back empty response so that the request/response protocol completes. + if let Err(error) = self.swarm.transfer_proof.send_ack(channel) { + error!("Failed to send Transfer Proof ack: {:?}", error); + } } OutEvent::EncryptedSignatureAcknowledged => { debug!("Alice acknowledged encrypted signature"); let _ = self.recv_encrypted_signature_ack.send(()).await; } + OutEvent::ResponseSent => {} + OutEvent::Failure(err) => { + error!("Communication error: {:#}", err) + } } }, option = self.dial_alice.recv().fuse() => { @@ -255,21 +228,11 @@ impl EventLoop { self.swarm.send_swap_request(self.alice_peer_id, swap_request); } }, - - msg0 = self.send_message0.recv().fuse() => { - if let Some(msg) = msg0 { - self.swarm.send_message0(self.alice_peer_id, msg); - } - } - - msg1 = self.send_message1.recv().fuse() => { - if let Some(msg) = msg1 { - self.swarm.send_message1(self.alice_peer_id, msg); - } - }, - msg2 = self.send_message2.recv().fuse() => { - if let Some(msg) = msg2 { - self.swarm.send_message2(self.alice_peer_id, msg); + option = self.start_execution_setup.recv().fuse() => { + if let Some(state0) = option { + let _ = self + .swarm + .start_execution_setup(self.alice_peer_id, state0, self.bitcoin_wallet.clone()); } }, encrypted_signature = self.send_encrypted_signature.recv().fuse() => { diff --git a/swap/src/protocol/bob/execution_setup.rs b/swap/src/protocol/bob/execution_setup.rs new file mode 100644 index 00000000..dc87f275 --- /dev/null +++ b/swap/src/protocol/bob/execution_setup.rs @@ -0,0 +1,108 @@ +use crate::{ + bitcoin::Signature, + network::request_response::BUF_SIZE, + protocol::{ + alice::{Message1, Message3}, + bob::{State0, State2}, + }, +}; +use anyhow::{Context, Error, Result}; +use libp2p::PeerId; +use libp2p_async_await::BehaviourOutEvent; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Message0 { + pub(crate) B: crate::bitcoin::PublicKey, + pub(crate) S_b_monero: monero::PublicKey, + pub(crate) S_b_bitcoin: crate::bitcoin::PublicKey, + pub(crate) dleq_proof_s_b: cross_curve_dleq::Proof, + pub(crate) v_b: crate::monero::PrivateViewKey, + pub(crate) refund_address: bitcoin::Address, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Message2 { + pub(crate) tx_lock: crate::bitcoin::TxLock, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Message4 { + pub(crate) tx_punish_sig: Signature, + pub(crate) tx_cancel_sig: Signature, +} + +#[derive(Debug)] +pub enum OutEvent { + Done(Result), +} + +impl From> for OutEvent { + fn from(event: BehaviourOutEvent<(), State2, Error>) -> Self { + match event { + BehaviourOutEvent::Outbound(_, Ok(State2)) => OutEvent::Done(Ok(State2)), + BehaviourOutEvent::Outbound(_, Err(e)) => OutEvent::Done(Err(e)), + BehaviourOutEvent::Inbound(..) => unreachable!("Bob only supports outbound"), + } + } +} + +#[derive(libp2p::NetworkBehaviour)] +#[behaviour(out_event = "OutEvent", event_process = false)] +pub struct Behaviour { + inner: libp2p_async_await::Behaviour<(), State2, anyhow::Error>, +} + +impl Default for Behaviour { + fn default() -> Self { + Self { + inner: libp2p_async_await::Behaviour::new(b"/comit/xmr/btc/execution_setup/1.0.0"), + } + } +} + +impl Behaviour { + pub fn run( + &mut self, + alice: PeerId, + state0: State0, + bitcoin_wallet: Arc, + ) { + self.inner + .do_protocol_dialer(alice, move |mut substream| async move { + substream + .write_message( + &serde_cbor::to_vec(&state0.next_message()) + .context("failed to serialize message0")?, + ) + .await?; + + let message1 = + serde_cbor::from_slice::(&substream.read_message(BUF_SIZE).await?) + .context("failed to deserialize message1")?; + let state1 = state0.receive(bitcoin_wallet.as_ref(), message1).await?; + + substream + .write_message( + &serde_cbor::to_vec(&state1.next_message()) + .context("failed to serialize message2")?, + ) + .await?; + + let message3 = + serde_cbor::from_slice::(&substream.read_message(BUF_SIZE).await?) + .context("failed to deserialize message3")?; + let state2 = state1.receive(message3)?; + + substream + .write_message( + &serde_cbor::to_vec(&state2.next_message()) + .context("failed to serialize message4")?, + ) + .await?; + + Ok(state2) + }) + } +} diff --git a/swap/src/protocol/bob/message0.rs b/swap/src/protocol/bob/message0.rs deleted file mode 100644 index 1903a62e..00000000 --- a/swap/src/protocol/bob/message0.rs +++ /dev/null @@ -1,110 +0,0 @@ -use crate::{ - bitcoin, monero, - network::request_response::{AliceToBob, BobToAlice, Codec, Message0Protocol, TIMEOUT}, - protocol::{alice, bob}, -}; -use libp2p::{ - request_response::{ - handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig, - RequestResponseEvent, RequestResponseMessage, - }, - swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}, - NetworkBehaviour, PeerId, -}; -use serde::{Deserialize, Serialize}; -use std::{ - collections::VecDeque, - task::{Context, Poll}, - time::Duration, -}; -use tracing::{debug, error}; - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct Message0 { - pub(crate) B: bitcoin::PublicKey, - pub(crate) S_b_monero: monero::PublicKey, - pub(crate) S_b_bitcoin: bitcoin::PublicKey, - pub(crate) dleq_proof_s_b: cross_curve_dleq::Proof, - pub(crate) v_b: monero::PrivateViewKey, - pub(crate) refund_address: bitcoin::Address, -} - -#[derive(Debug)] -pub enum OutEvent { - Msg(alice::Message0), -} - -/// A `NetworkBehaviour` that represents send/recv of message 0. -#[derive(NetworkBehaviour)] -#[behaviour(out_event = "OutEvent", poll_method = "poll")] -#[allow(missing_debug_implementations)] -pub struct Behaviour { - rr: RequestResponse>, - #[behaviour(ignore)] - events: VecDeque, -} - -impl Behaviour { - pub fn send(&mut self, alice: PeerId, msg: bob::Message0) { - let msg = BobToAlice::Message0(Box::new(msg)); - let _id = self.rr.send_request(&alice, msg); - } - - fn poll( - &mut self, - _: &mut Context<'_>, - _: &mut impl PollParameters, - ) -> Poll>, OutEvent>> { - if let Some(event) = self.events.pop_front() { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); - } - - Poll::Pending - } -} - -impl Default for Behaviour { - fn default() -> Self { - let timeout = Duration::from_secs(TIMEOUT); - let mut config = RequestResponseConfig::default(); - config.set_request_timeout(timeout); - - Self { - rr: RequestResponse::new( - Codec::default(), - vec![(Message0Protocol, ProtocolSupport::Full)], - config, - ), - events: Default::default(), - } - } -} - -impl NetworkBehaviourEventProcess> for Behaviour { - fn inject_event(&mut self, event: RequestResponseEvent) { - match event { - RequestResponseEvent::Message { - message: RequestResponseMessage::Request { .. }, - .. - } => panic!("Bob should never get a request from Alice"), - RequestResponseEvent::Message { - message: RequestResponseMessage::Response { response, .. }, - .. - } => { - if let AliceToBob::Message0(msg) = response { - debug!("Received Message0"); - self.events.push_back(OutEvent::Msg(*msg)); - } - } - RequestResponseEvent::InboundFailure { error, .. } => { - error!("Inbound failure: {:?}", error); - } - RequestResponseEvent::OutboundFailure { error, .. } => { - error!("Outbound failure: {:?}", error); - } - RequestResponseEvent::ResponseSent { .. } => { - unreachable!("Bob does not send a message0 response to Alice"); - } - } - } -} diff --git a/swap/src/protocol/bob/message1.rs b/swap/src/protocol/bob/message1.rs deleted file mode 100644 index 4385ad54..00000000 --- a/swap/src/protocol/bob/message1.rs +++ /dev/null @@ -1,105 +0,0 @@ -use crate::{ - bitcoin, - network::request_response::{AliceToBob, BobToAlice, Codec, Message1Protocol, TIMEOUT}, - protocol::alice, -}; -use libp2p::{ - request_response::{ - handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig, - RequestResponseEvent, RequestResponseMessage, - }, - swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}, - NetworkBehaviour, PeerId, -}; -use serde::{Deserialize, Serialize}; -use std::{ - collections::VecDeque, - task::{Context, Poll}, - time::Duration, -}; -use tracing::{debug, error}; - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct Message1 { - pub(crate) tx_lock: bitcoin::TxLock, -} - -#[derive(Debug)] -pub enum OutEvent { - Msg(alice::Message1), -} - -/// A `NetworkBehaviour` that represents send/recv of message 1. -#[derive(NetworkBehaviour)] -#[behaviour(out_event = "OutEvent", poll_method = "poll")] -#[allow(missing_debug_implementations)] -pub struct Behaviour { - rr: RequestResponse>, - #[behaviour(ignore)] - events: VecDeque, -} - -impl Behaviour { - pub fn send(&mut self, alice: PeerId, msg: Message1) { - let msg = BobToAlice::Message1(Box::new(msg)); - let _id = self.rr.send_request(&alice, msg); - } - - fn poll( - &mut self, - _: &mut Context<'_>, - _: &mut impl PollParameters, - ) -> Poll>, OutEvent>> { - if let Some(event) = self.events.pop_front() { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); - } - - Poll::Pending - } -} - -impl Default for Behaviour { - fn default() -> Self { - let timeout = Duration::from_secs(TIMEOUT); - let mut config = RequestResponseConfig::default(); - config.set_request_timeout(timeout); - - Self { - rr: RequestResponse::new( - Codec::default(), - vec![(Message1Protocol, ProtocolSupport::Full)], - config, - ), - events: Default::default(), - } - } -} - -impl NetworkBehaviourEventProcess> for Behaviour { - fn inject_event(&mut self, event: RequestResponseEvent) { - match event { - RequestResponseEvent::Message { - message: RequestResponseMessage::Request { .. }, - .. - } => panic!("Bob should never get a request from Alice"), - RequestResponseEvent::Message { - message: RequestResponseMessage::Response { response, .. }, - .. - } => { - if let AliceToBob::Message1(msg) = response { - debug!("Received Message1"); - self.events.push_back(OutEvent::Msg(*msg)); - } - } - RequestResponseEvent::InboundFailure { error, .. } => { - error!("Inbound failure: {:?}", error); - } - RequestResponseEvent::OutboundFailure { error, .. } => { - error!("Outbound failure: {:?}", error); - } - RequestResponseEvent::ResponseSent { .. } => { - unreachable!("Bob does not send a message 1 response to Alice"); - } - } - } -} diff --git a/swap/src/protocol/bob/message2.rs b/swap/src/protocol/bob/message2.rs deleted file mode 100644 index c767fd8b..00000000 --- a/swap/src/protocol/bob/message2.rs +++ /dev/null @@ -1,103 +0,0 @@ -use crate::network::request_response::{AliceToBob, BobToAlice, Codec, Message2Protocol, TIMEOUT}; -use ecdsa_fun::Signature; -use libp2p::{ - request_response::{ - handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig, - RequestResponseEvent, RequestResponseMessage, - }, - swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}, - NetworkBehaviour, PeerId, -}; -use serde::{Deserialize, Serialize}; -use std::{ - collections::VecDeque, - task::{Context, Poll}, - time::Duration, -}; -use tracing::{debug, error}; - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct Message2 { - pub(crate) tx_punish_sig: Signature, - pub(crate) tx_cancel_sig: Signature, -} - -#[derive(Clone, Copy, Debug)] -pub enum OutEvent { - Msg, -} - -/// A `NetworkBehaviour` that represents sending message 2 to Alice. -#[derive(NetworkBehaviour)] -#[behaviour(out_event = "OutEvent", poll_method = "poll")] -#[allow(missing_debug_implementations)] -pub struct Behaviour { - rr: RequestResponse>, - #[behaviour(ignore)] - events: VecDeque, -} - -impl Behaviour { - pub fn send(&mut self, alice: PeerId, msg: Message2) { - let msg = BobToAlice::Message2(Box::new(msg)); - let _id = self.rr.send_request(&alice, msg); - } - - fn poll( - &mut self, - _: &mut Context<'_>, - _: &mut impl PollParameters, - ) -> Poll>, OutEvent>> { - if let Some(event) = self.events.pop_front() { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); - } - - Poll::Pending - } -} - -impl Default for Behaviour { - fn default() -> Self { - let timeout = Duration::from_secs(TIMEOUT); - let mut config = RequestResponseConfig::default(); - config.set_request_timeout(timeout); - - Self { - rr: RequestResponse::new( - Codec::default(), - vec![(Message2Protocol, ProtocolSupport::Full)], - config, - ), - events: VecDeque::default(), - } - } -} - -impl NetworkBehaviourEventProcess> for Behaviour { - fn inject_event(&mut self, event: RequestResponseEvent) { - match event { - RequestResponseEvent::Message { - message: RequestResponseMessage::Request { .. }, - .. - } => panic!("Bob should never get a request from Alice"), - RequestResponseEvent::Message { - message: RequestResponseMessage::Response { response, .. }, - .. - } => { - if let AliceToBob::Message2 = response { - debug!("Received Message 2 acknowledgement"); - self.events.push_back(OutEvent::Msg); - } - } - RequestResponseEvent::InboundFailure { error, .. } => { - error!("Inbound failure: {:?}", error); - } - RequestResponseEvent::OutboundFailure { error, .. } => { - error!("Outbound failure: {:?}", error); - } - RequestResponseEvent::ResponseSent { .. } => { - unreachable!("Bob does not send a Message2 response to Alice"); - } - } - } -} diff --git a/swap/src/protocol/bob/state.rs b/swap/src/protocol/bob/state.rs index f727291a..66c8c4eb 100644 --- a/swap/src/protocol/bob/state.rs +++ b/swap/src/protocol/bob/state.rs @@ -9,7 +9,11 @@ use crate::{ execution_params::ExecutionParams, monero, monero::{monero_private_key, TransferProof}, - protocol::{alice, bob, bob::EncryptedSignature, SwapAmounts}, + protocol::{ + alice::{Message1, Message3}, + bob::{EncryptedSignature, Message0, Message2, Message4}, + SwapAmounts, + }, }; use anyhow::{anyhow, Result}; use ecdsa_fun::{adaptor::Adaptor, nonce::Deterministic, Signature}; @@ -74,6 +78,7 @@ pub struct State0 { b: bitcoin::SecretKey, s_b: cross_curve_dleq::Scalar, v_b: monero::PrivateViewKey, + dleq_proof_s_b: cross_curve_dleq::Proof, #[serde(with = "::bitcoin::util::amount::serde::as_sat")] btc: bitcoin::Amount, xmr: monero::Amount, @@ -97,6 +102,7 @@ impl State0 { let s_b = cross_curve_dleq::Scalar::random(rng); let v_b = monero::PrivateViewKey::new_random(rng); + let dleq_proof_s_b = cross_curve_dleq::Proof::new(rng, &s_b); Self { b, @@ -104,6 +110,7 @@ impl State0 { v_b, btc, xmr, + dleq_proof_s_b, cancel_timelock, punish_timelock, refund_address, @@ -111,22 +118,20 @@ impl State0 { } } - pub fn next_message(&self, rng: &mut R) -> bob::Message0 { - let dleq_proof_s_b = cross_curve_dleq::Proof::new(rng, &self.s_b); - - bob::Message0 { + pub fn next_message(&self) -> Message0 { + Message0 { B: self.b.public(), S_b_monero: monero::PublicKey::from_private_key(&monero::PrivateKey { scalar: self.s_b.into_ed25519(), }), S_b_bitcoin: self.s_b.into_secp256k1().into(), - dleq_proof_s_b, + dleq_proof_s_b: self.dleq_proof_s_b.clone(), v_b: self.v_b, refund_address: self.refund_address.clone(), } } - pub async fn receive(self, wallet: &W, msg: alice::Message0) -> anyhow::Result + pub async fn receive(self, wallet: &W, msg: Message1) -> anyhow::Result where W: BuildTxLockPsbt + GetNetwork, { @@ -182,13 +187,13 @@ pub struct State1 { } impl State1 { - pub fn next_message(&self) -> bob::Message1 { - bob::Message1 { + pub fn next_message(&self) -> Message2 { + Message2 { tx_lock: self.tx_lock.clone(), } } - pub fn receive(self, msg: alice::Message1) -> Result { + pub fn receive(self, msg: Message3) -> Result { let tx_cancel = TxCancel::new(&self.tx_lock, self.cancel_timelock, self.A, self.b.public()); let tx_refund = bitcoin::TxRefund::new(&tx_cancel, &self.refund_address); @@ -245,14 +250,14 @@ pub struct State2 { } impl State2 { - pub fn next_message(&self) -> bob::Message2 { + pub fn next_message(&self) -> Message4 { let tx_cancel = TxCancel::new(&self.tx_lock, self.cancel_timelock, self.A, self.b.public()); let tx_cancel_sig = self.b.sign(tx_cancel.digest()); let tx_punish = bitcoin::TxPunish::new(&tx_cancel, &self.punish_address, self.punish_timelock); let tx_punish_sig = self.b.sign(tx_punish.digest()); - bob::Message2 { + Message4 { tx_punish_sig, tx_cancel_sig, } diff --git a/swap/src/protocol/bob/swap.rs b/swap/src/protocol/bob/swap.rs index ced72266..5036b52c 100644 --- a/swap/src/protocol/bob/swap.rs +++ b/swap/src/protocol/bob/swap.rs @@ -11,7 +11,6 @@ use crate::{ }; use anyhow::{bail, Result}; use async_recursion::async_recursion; -use rand::{rngs::OsRng, CryptoRng, RngCore}; use std::sync::Arc; use tokio::select; use tracing::info; @@ -44,7 +43,6 @@ pub async fn run_until( swap.db, swap.bitcoin_wallet, swap.monero_wallet, - OsRng, swap.swap_id, swap.execution_params, ) @@ -54,20 +52,16 @@ pub async fn run_until( // State machine driver for swap execution #[allow(clippy::too_many_arguments)] #[async_recursion] -async fn run_until_internal( +async fn run_until_internal( state: BobState, is_target_state: fn(&BobState) -> bool, mut event_loop_handle: EventLoopHandle, db: Database, bitcoin_wallet: Arc, monero_wallet: Arc, - mut rng: R, swap_id: Uuid, execution_params: ExecutionParams, -) -> Result -where - R: RngCore + CryptoRng + Send, -{ +) -> Result { info!("Current state: {}", state); if is_target_state(&state) { Ok(state) @@ -76,14 +70,7 @@ where BobState::Started { state0, amounts } => { event_loop_handle.dial().await?; - let state2 = negotiate( - state0, - amounts, - &mut event_loop_handle, - &mut rng, - bitcoin_wallet.clone(), - ) - .await?; + let state2 = negotiate(state0, amounts, &mut event_loop_handle).await?; let state = BobState::Negotiated(state2); let db_state = state.clone().into(); @@ -95,7 +82,6 @@ where db, bitcoin_wallet, monero_wallet, - rng, swap_id, execution_params, ) @@ -117,7 +103,6 @@ where db, bitcoin_wallet, monero_wallet, - rng, swap_id, execution_params, ) @@ -170,7 +155,6 @@ where db, bitcoin_wallet, monero_wallet, - rng, swap_id, execution_params, ) @@ -217,7 +201,6 @@ where db, bitcoin_wallet, monero_wallet, - rng, swap_id, execution_params, ) @@ -260,7 +243,6 @@ where db, bitcoin_wallet, monero_wallet, - rng, swap_id, execution_params, ) @@ -296,7 +278,6 @@ where db, bitcoin_wallet.clone(), monero_wallet, - rng, swap_id, execution_params, ) @@ -318,7 +299,6 @@ where db, bitcoin_wallet, monero_wallet, - rng, swap_id, execution_params, ) @@ -344,7 +324,6 @@ where db, bitcoin_wallet, monero_wallet, - rng, swap_id, execution_params, ) @@ -376,7 +355,6 @@ where db, bitcoin_wallet, monero_wallet, - rng, swap_id, execution_params, ) @@ -390,18 +368,13 @@ where } } -pub async fn negotiate( +pub async fn negotiate( state0: crate::protocol::bob::state::State0, amounts: SwapAmounts, - swarm: &mut EventLoopHandle, - mut rng: R, - bitcoin_wallet: Arc, -) -> Result -where - R: RngCore + CryptoRng + Send, -{ + event_loop_handle: &mut EventLoopHandle, +) -> Result { tracing::trace!("Starting negotiate"); - swarm + event_loop_handle .send_swap_request(SwapRequest { btc_amount: amounts.btc, }) @@ -409,17 +382,9 @@ where // TODO: Use this once Bob's CLI is modified to only pass xmr amount in // argument. - let _swap_response = swarm.recv_swap_response().await?; + let _swap_response = event_loop_handle.recv_swap_response().await?; - swarm.send_message0(state0.next_message(&mut rng)).await?; - let msg0 = swarm.recv_message0().await?; - let state1 = state0.receive(bitcoin_wallet.as_ref(), msg0).await?; - - swarm.send_message1(state1.next_message()).await?; - let msg1 = swarm.recv_message1().await?; - let state2 = state1.receive(msg1)?; - - swarm.send_message2(state2.next_message()).await?; + let state2 = event_loop_handle.execution_setup(state0).await?; Ok(state2) } diff --git a/swap/src/protocol/bob/swap_request.rs b/swap/src/protocol/bob/swap_request.rs index 8e1440fb..b7d50dad 100644 --- a/swap/src/protocol/bob/swap_request.rs +++ b/swap/src/protocol/bob/swap_request.rs @@ -1,23 +1,18 @@ use crate::{ - network::request_response::{AliceToBob, BobToAlice, Codec, Swap, TIMEOUT}, + network::request_response::{CborCodec, Swap, TIMEOUT}, protocol::alice::SwapResponse, }; -use anyhow::Result; +use anyhow::{anyhow, Error, Result}; use libp2p::{ request_response::{ - handler::RequestProtocol, ProtocolSupport, RequestId, RequestResponse, - RequestResponseConfig, RequestResponseEvent, RequestResponseMessage, + ProtocolSupport, RequestId, RequestResponse, RequestResponseConfig, RequestResponseEvent, + RequestResponseMessage, }, - swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}, NetworkBehaviour, PeerId, }; use serde::{Deserialize, Serialize}; -use std::{ - collections::VecDeque, - task::{Context, Poll}, - time::Duration, -}; -use tracing::{debug, error}; +use std::time::Duration; +use tracing::debug; #[derive(Clone, Copy, Debug, Serialize, Deserialize)] pub struct SwapRequest { @@ -25,40 +20,26 @@ pub struct SwapRequest { pub btc_amount: bitcoin::Amount, } -#[derive(Copy, Clone, Debug)] -pub struct OutEvent { - pub swap_response: SwapResponse, +#[derive(Debug)] +pub enum OutEvent { + MsgReceived(SwapResponse), + Failure(Error), } /// A `NetworkBehaviour` that represents doing the negotiation of a swap. #[derive(NetworkBehaviour)] -#[behaviour(out_event = "OutEvent", poll_method = "poll")] +#[behaviour(out_event = "OutEvent", event_process = false)] #[allow(missing_debug_implementations)] pub struct Behaviour { - rr: RequestResponse>, - #[behaviour(ignore)] - events: VecDeque, + rr: RequestResponse>, } impl Behaviour { pub fn send(&mut self, alice: PeerId, swap_request: SwapRequest) -> Result { - let msg = BobToAlice::SwapRequest(Box::new(swap_request)); - let id = self.rr.send_request(&alice, msg); + let id = self.rr.send_request(&alice, swap_request); Ok(id) } - - fn poll( - &mut self, - _: &mut Context<'_>, - _: &mut impl PollParameters, - ) -> Poll>, OutEvent>> { - if let Some(event) = self.events.pop_front() { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); - } - - Poll::Pending - } } impl Default for Behaviour { @@ -70,41 +51,37 @@ impl Default for Behaviour { Self { rr: RequestResponse::new( - Codec::default(), + CborCodec::default(), vec![(Swap, ProtocolSupport::Outbound)], config, ), - events: Default::default(), } } } -impl NetworkBehaviourEventProcess> for Behaviour { - fn inject_event(&mut self, event: RequestResponseEvent) { +impl From> for OutEvent { + fn from(event: RequestResponseEvent) -> Self { match event { RequestResponseEvent::Message { message: RequestResponseMessage::Request { .. }, .. - } => panic!("Bob should never get a request from Alice"), + } => OutEvent::Failure(anyhow!("Bob should never get a request from Alice")), RequestResponseEvent::Message { + peer, message: RequestResponseMessage::Response { response, .. }, .. } => { - if let AliceToBob::SwapResponse(swap_response) = response { - debug!("Received swap response"); - self.events.push_back(OutEvent { - swap_response: *swap_response, - }); - } + debug!("Received swap response from {}", peer); + OutEvent::MsgReceived(response) } RequestResponseEvent::InboundFailure { error, .. } => { - error!("Inbound failure: {:?}", error); + OutEvent::Failure(anyhow!("Inbound failure: {:?}", error)) } RequestResponseEvent::OutboundFailure { error, .. } => { - error!("Outbound failure: {:?}", error); + OutEvent::Failure(anyhow!("Outbound failure: {:?}", error)) } RequestResponseEvent::ResponseSent { .. } => { - error!("Bob does not send a swap response to Alice"); + OutEvent::Failure(anyhow!("Bob does not send a swap response to Alice")) } } } diff --git a/swap/src/protocol/bob/transfer_proof.rs b/swap/src/protocol/bob/transfer_proof.rs index 6caa3475..67eeb9c4 100644 --- a/swap/src/protocol/bob/transfer_proof.rs +++ b/swap/src/protocol/bob/transfer_proof.rs @@ -1,50 +1,42 @@ use crate::{ - network::request_response::{OneShotCodec, Request, Response, TransferProofProtocol, TIMEOUT}, + network::request_response::{CborCodec, TransferProofProtocol, TIMEOUT}, protocol::alice::TransferProof, }; +use anyhow::{anyhow, Error, Result}; use libp2p::{ request_response::{ - handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig, - RequestResponseEvent, RequestResponseMessage, + ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent, + RequestResponseMessage, ResponseChannel, }, - swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}, NetworkBehaviour, }; -use std::{ - collections::VecDeque, - task::{Context, Poll}, - time::Duration, -}; -use tracing::{debug, error}; +use std::time::Duration; +use tracing::debug; #[derive(Debug)] pub enum OutEvent { - Msg(TransferProof), + MsgReceived { + msg: TransferProof, + channel: ResponseChannel<()>, + }, + AckSent, + Failure(Error), } /// A `NetworkBehaviour` that represents receiving the transfer proof from /// Alice. #[derive(NetworkBehaviour)] -#[behaviour(out_event = "OutEvent", poll_method = "poll")] +#[behaviour(out_event = "OutEvent", event_process = false)] #[allow(missing_debug_implementations)] pub struct Behaviour { - rr: RequestResponse>, - #[behaviour(ignore)] - events: VecDeque, + rr: RequestResponse>, } impl Behaviour { - fn poll( - &mut self, - _: &mut Context<'_>, - _: &mut impl PollParameters, - ) -> Poll>, OutEvent>> - { - if let Some(event) = self.events.pop_front() { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); - } - - Poll::Pending + pub fn send_ack(&mut self, channel: ResponseChannel<()>) -> Result<()> { + self.rr + .send_response(channel, ()) + .map_err(|err| anyhow!("Failed to ack transfer proof: {:?}", err)) } } @@ -56,46 +48,42 @@ impl Default for Behaviour { Self { rr: RequestResponse::new( - OneShotCodec::default(), + CborCodec::default(), vec![(TransferProofProtocol, ProtocolSupport::Inbound)], config, ), - events: Default::default(), } } } -impl NetworkBehaviourEventProcess> for Behaviour { - fn inject_event(&mut self, event: RequestResponseEvent) { +impl From> for OutEvent { + fn from(event: RequestResponseEvent) -> Self { match event { RequestResponseEvent::Message { + peer, message: RequestResponseMessage::Request { request, channel, .. }, .. } => { - if let Request::TransferProof(msg) = request { - debug!("Received Transfer Proof"); - self.events.push_back(OutEvent::Msg(*msg)); - // Send back empty response so that the request/response protocol completes. - let _ = self - .rr - .send_response(channel, Response::TransferProof) - .map_err(|err| error!("Failed to send message 3: {:?}", err)); + debug!("Received Transfer Proof from {}", peer); + OutEvent::MsgReceived { + msg: request, + channel, } } RequestResponseEvent::Message { message: RequestResponseMessage::Response { .. }, .. - } => panic!("Bob should not get a Response"), + } => OutEvent::Failure(anyhow!("Bob should not get a Response")), RequestResponseEvent::InboundFailure { error, .. } => { - error!("Inbound failure: {:?}", error); + OutEvent::Failure(anyhow!("Inbound failure: {:?}", error)) } RequestResponseEvent::OutboundFailure { error, .. } => { - error!("Outbound failure: {:?}", error); + OutEvent::Failure(anyhow!("Outbound failure: {:?}", error)) } - RequestResponseEvent::ResponseSent { .. } => debug!("Bob ack'd transfer proof message"), + RequestResponseEvent::ResponseSent { .. } => OutEvent::AckSent, } } } diff --git a/swap/tests/happy_path_restart_alice.rs b/swap/tests/happy_path_restart_alice.rs index fe38a09d..fbf2515c 100644 --- a/swap/tests/happy_path_restart_alice.rs +++ b/swap/tests/happy_path_restart_alice.rs @@ -15,10 +15,10 @@ async fn given_alice_restarts_after_encsig_is_learned_resume_swap() { let alice_state = alice::run_until(alice_swap, is_encsig_learned) .await .unwrap(); - assert!(matches!(alice_state, AliceState::EncSigLearned {..})); + assert!(matches!(alice_state, AliceState::EncSigLearned { .. })); let alice_swap = ctx.stop_and_resume_alice_from_db(alice_join_handle).await; - assert!(matches!(alice_swap.state, AliceState::EncSigLearned {..})); + assert!(matches!(alice_swap.state, AliceState::EncSigLearned { .. })); let alice_state = alice::run(alice_swap).await.unwrap(); diff --git a/swap/tests/happy_path_restart_bob_after_comm.rs b/swap/tests/happy_path_restart_bob_after_comm.rs index 09024be6..87a3b861 100644 --- a/swap/tests/happy_path_restart_bob_after_comm.rs +++ b/swap/tests/happy_path_restart_bob_after_comm.rs @@ -14,10 +14,10 @@ async fn given_bob_restarts_after_encsig_is_sent_resume_swap() { let bob_state = bob::run_until(bob_swap, is_encsig_sent).await.unwrap(); - assert!(matches!(bob_state, BobState::EncSigSent {..})); + assert!(matches!(bob_state, BobState::EncSigSent { .. })); let bob_swap = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; - assert!(matches!(bob_swap.state, BobState::EncSigSent {..})); + assert!(matches!(bob_swap.state, BobState::EncSigSent { .. })); let bob_state = bob::run(bob_swap).await.unwrap(); diff --git a/swap/tests/happy_path_restart_bob_after_lock_proof_received.rs b/swap/tests/happy_path_restart_bob_after_lock_proof_received.rs index e742fbed..7101c0ab 100644 --- a/swap/tests/happy_path_restart_bob_after_lock_proof_received.rs +++ b/swap/tests/happy_path_restart_bob_after_lock_proof_received.rs @@ -16,11 +16,13 @@ async fn given_bob_restarts_after_lock_proof_received_resume_swap() { .await .unwrap(); - assert!(matches!(bob_state, BobState::XmrLockProofReceived {..})); + assert!(matches!(bob_state, BobState::XmrLockProofReceived { .. })); let bob_swap = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; - assert!(matches!(bob_swap.state, BobState::XmrLockProofReceived - {..})); + assert!(matches!( + bob_swap.state, + BobState::XmrLockProofReceived { .. } + )); let bob_state = bob::run(bob_swap).await.unwrap(); diff --git a/swap/tests/happy_path_restart_bob_before_comm.rs b/swap/tests/happy_path_restart_bob_before_comm.rs index 06306cca..fd5f8979 100644 --- a/swap/tests/happy_path_restart_bob_before_comm.rs +++ b/swap/tests/happy_path_restart_bob_before_comm.rs @@ -14,10 +14,10 @@ async fn given_bob_restarts_after_xmr_is_locked_resume_swap() { let bob_state = bob::run_until(bob_swap, is_xmr_locked).await.unwrap(); - assert!(matches!(bob_state, BobState::XmrLocked {..})); + assert!(matches!(bob_state, BobState::XmrLocked { .. })); let bob_swap = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; - assert!(matches!(bob_swap.state, BobState::XmrLocked {..})); + assert!(matches!(bob_swap.state, BobState::XmrLocked { .. })); let bob_state = bob::run(bob_swap).await.unwrap(); diff --git a/swap/tests/punish.rs b/swap/tests/punish.rs index 7da44631..61de0577 100644 --- a/swap/tests/punish.rs +++ b/swap/tests/punish.rs @@ -16,7 +16,7 @@ async fn alice_punishes_if_bob_never_acts_after_fund() { let bob_state = bob::run_until(bob_swap, is_btc_locked).await.unwrap(); - assert!(matches!(bob_state, BobState::BtcLocked {..})); + assert!(matches!(bob_state, BobState::BtcLocked { .. })); let alice_state = alice_handle.await.unwrap(); ctx.assert_alice_punished(alice_state.unwrap()).await; @@ -24,7 +24,7 @@ async fn alice_punishes_if_bob_never_acts_after_fund() { // Restart Bob after Alice punished to ensure Bob transitions to // punished and does not run indefinitely let bob_swap = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; - assert!(matches!(bob_swap.state, BobState::BtcLocked {..})); + assert!(matches!(bob_swap.state, BobState::BtcLocked { .. })); let bob_state = bob::run(bob_swap).await.unwrap(); diff --git a/swap/tests/refund_restart_alice.rs b/swap/tests/refund_restart_alice.rs index 475212f3..5af2c7a0 100644 --- a/swap/tests/refund_restart_alice.rs +++ b/swap/tests/refund_restart_alice.rs @@ -15,8 +15,7 @@ async fn given_alice_restarts_after_xmr_is_locked_refund_swap() { let bob_handle = tokio::spawn(bob); let alice_state = alice::run_until(alice_swap, is_xmr_locked).await.unwrap(); - assert!(matches!(alice_state, - AliceState::XmrLocked {..})); + assert!(matches!(alice_state, AliceState::XmrLocked { .. })); // Alice does not act, Bob refunds let bob_state = bob_handle.await.unwrap(); @@ -24,7 +23,7 @@ async fn given_alice_restarts_after_xmr_is_locked_refund_swap() { // Once bob has finished Alice is restarted and refunds as well let alice_swap = ctx.stop_and_resume_alice_from_db(alice_join_handle).await; - assert!(matches!(alice_swap.state, AliceState::XmrLocked {..})); + assert!(matches!(alice_swap.state, AliceState::XmrLocked { .. })); let alice_state = alice::run(alice_swap).await.unwrap(); diff --git a/swap/tests/refund_restart_alice_cancelled.rs b/swap/tests/refund_restart_alice_cancelled.rs index 116c1fd4..129887a1 100644 --- a/swap/tests/refund_restart_alice_cancelled.rs +++ b/swap/tests/refund_restart_alice_cancelled.rs @@ -22,7 +22,7 @@ async fn given_alice_restarts_after_enc_sig_learned_and_bob_already_cancelled_re .await .unwrap(); assert!( - matches!(alice_state, AliceState::EncSigLearned {..}), + matches!(alice_state, AliceState::EncSigLearned { .. }), "Alice state is not EncSigLearned: {:?}", alice_state ); @@ -34,8 +34,7 @@ async fn given_alice_restarts_after_enc_sig_learned_and_bob_already_cancelled_re // Once bob has finished Alice is restarted and refunds as well let alice_swap = ctx.stop_and_resume_alice_from_db(alice_join_handle).await; assert!( - matches!(alice_swap.state, AliceState::EncSigLearned - {..}), + matches!(alice_swap.state, AliceState::EncSigLearned { .. }), "Alice state is not EncSigLearned: {:?}", alice_state ); diff --git a/swap/tests/testutils/mod.rs b/swap/tests/testutils/mod.rs index 1d435958..847af388 100644 --- a/swap/tests/testutils/mod.rs +++ b/swap/tests/testutils/mod.rs @@ -478,17 +478,11 @@ pub mod alice_run_until { use swap::protocol::alice::AliceState; pub fn is_xmr_locked(state: &AliceState) -> bool { - matches!( - state, - AliceState::XmrLocked{..} - ) + matches!(state, AliceState::XmrLocked { .. }) } pub fn is_encsig_learned(state: &AliceState) -> bool { - matches!( - state, - AliceState::EncSigLearned{..} - ) + matches!(state, AliceState::EncSigLearned { .. }) } }