diff --git a/docs/sequence.puml b/docs/sequence.puml index 189856db..11638d4b 100644 --- a/docs/sequence.puml +++ b/docs/sequence.puml @@ -46,10 +46,10 @@ group Execution Alice ->> Monero: Lock - Alice -> Bob: Message4 + Alice -> Bob: TransferProof note right: xmr lock tx transfer proof\nThis can be removed if Bob watches the blockchain. - Bob -> Alice: Message5 + Bob -> Alice: EncryptedSignature note left: redeem tx enc sig S_a Alice ->> Bitcoin: Redeem diff --git a/swap/src/database/alice.rs b/swap/src/database/alice.rs index 939ff550..cc004bcf 100644 --- a/swap/src/database/alice.rs +++ b/swap/src/database/alice.rs @@ -5,6 +5,7 @@ use crate::{ protocol::{alice, alice::AliceState, SwapAmounts}, }; use ::bitcoin::hashes::core::fmt::Display; +use libp2p::PeerId; use serde::{Deserialize, Serialize}; // Large enum variant is fine because this is only used for database @@ -16,8 +17,16 @@ pub enum Alice { amounts: SwapAmounts, state0: alice::State0, }, - Negotiated(alice::State3), - BtcLocked(alice::State3), + Negotiated { + state3: alice::State3, + #[serde(with = "crate::serde_peer_id")] + bob_peer_id: PeerId, + }, + BtcLocked { + state3: alice::State3, + #[serde(with = "crate::serde_peer_id")] + bob_peer_id: PeerId, + }, XmrLocked(alice::State3), EncSigLearned { encrypted_signature: EncryptedSignature, @@ -45,8 +54,22 @@ pub enum AliceEndState { impl From<&AliceState> for Alice { fn from(alice_state: &AliceState) -> Self { match alice_state { - AliceState::Negotiated { state3, .. } => Alice::Negotiated(state3.as_ref().clone()), - AliceState::BtcLocked { state3, .. } => Alice::BtcLocked(state3.as_ref().clone()), + AliceState::Negotiated { + state3, + bob_peer_id, + .. + } => Alice::Negotiated { + state3: state3.as_ref().clone(), + bob_peer_id: bob_peer_id.clone(), + }, + AliceState::BtcLocked { + state3, + bob_peer_id, + .. + } => Alice::BtcLocked { + state3: state3.as_ref().clone(), + bob_peer_id: bob_peer_id.clone(), + }, AliceState::XmrLocked { state3 } => Alice::XmrLocked(state3.as_ref().clone()), AliceState::EncSigLearned { state3, @@ -82,16 +105,22 @@ impl From for AliceState { fn from(db_state: Alice) -> Self { match db_state { Alice::Started { amounts, state0 } => AliceState::Started { amounts, state0 }, - Alice::Negotiated(state3) => AliceState::Negotiated { - channel: None, + Alice::Negotiated { + state3, + bob_peer_id, + } => AliceState::Negotiated { + bob_peer_id, amounts: SwapAmounts { btc: state3.btc, xmr: state3.xmr, }, state3: Box::new(state3), }, - Alice::BtcLocked(state3) => AliceState::BtcLocked { - channel: None, + Alice::BtcLocked { + state3, + bob_peer_id, + } => AliceState::BtcLocked { + bob_peer_id, amounts: SwapAmounts { btc: state3.btc, xmr: state3.xmr, @@ -157,8 +186,8 @@ impl Display for Alice { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Alice::Started { .. } => write!(f, "Started"), - Alice::Negotiated(_) => f.write_str("Negotiated"), - Alice::BtcLocked(_) => f.write_str("Bitcoin locked"), + Alice::Negotiated { .. } => f.write_str("Negotiated"), + Alice::BtcLocked { .. } => f.write_str("Bitcoin locked"), Alice::XmrLocked(_) => f.write_str("Monero locked"), Alice::CancelTimelockExpired(_) => f.write_str("Cancel timelock is expired"), Alice::BtcCancelled(_) => f.write_str("Bitcoin cancel transaction published"), diff --git a/swap/src/lib.rs b/swap/src/lib.rs index d85355a7..5e51f8d5 100644 --- a/swap/src/lib.rs +++ b/swap/src/lib.rs @@ -26,3 +26,4 @@ pub mod seed; pub mod trace; mod fs; +mod serde_peer_id; diff --git a/swap/src/main.rs b/swap/src/main.rs index 06fd7c00..e5c23fad 100644 --- a/swap/src/main.rs +++ b/swap/src/main.rs @@ -25,16 +25,18 @@ use tracing::{info, log::LevelFilter}; use uuid::Uuid; pub mod bitcoin; -mod cli; pub mod config; pub mod database; -mod fs; pub mod monero; pub mod network; pub mod protocol; pub mod seed; pub mod trace; +mod cli; +mod fs; +mod serde_peer_id; + #[macro_use] extern crate prettytable; diff --git a/swap/src/network/request_response.rs b/swap/src/network/request_response.rs index 1fd46224..29681595 100644 --- a/swap/src/network/request_response.rs +++ b/swap/src/network/request_response.rs @@ -1,8 +1,8 @@ -use crate::protocol::{alice, bob}; +use crate::protocol::{alice, alice::TransferProof, bob, bob::EncryptedSignature}; use async_trait::async_trait; use futures::prelude::*; use libp2p::{ - core::upgrade, + core::{upgrade, upgrade::ReadOneError}, request_response::{ProtocolName, RequestResponseCodec}, }; use serde::{Deserialize, Serialize}; @@ -21,21 +21,34 @@ const BUF_SIZE: usize = 1024 * 1024; /// Messages Bob sends to Alice. #[derive(Clone, Debug, Serialize, Deserialize)] pub enum BobToAlice { - SwapRequest(bob::SwapRequest), + SwapRequest(Box), Message0(Box), - Message1(bob::Message1), - Message2(bob::Message2), - Message3(bob::Message3), + Message1(Box), + Message2(Box), } /// Messages Alice sends to Bob. #[derive(Clone, Debug, Serialize, Deserialize)] pub enum AliceToBob { - SwapResponse(alice::SwapResponse), + SwapResponse(Box), Message0(Box), Message1(Box), - Message2(alice::Message2), - Message3, // empty response + Message2, +} + +/// Messages sent from one party to the other. +/// All responses are empty +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum Request { + TransferProof(Box), + EncryptedSignature(Box), +} + +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +/// Response are only used for acknowledgement purposes. +pub enum Response { + TransferProof, + EncryptedSignature, } #[derive(Debug, Clone, Copy, Default)] @@ -51,7 +64,10 @@ pub struct Message1Protocol; pub struct Message2Protocol; #[derive(Debug, Clone, Copy, Default)] -pub struct Message3Protocol; +pub struct TransferProofProtocol; + +#[derive(Debug, Clone, Copy, Default)] +pub struct EncryptedSignatureProtocol; impl ProtocolName for Swap { fn protocol_name(&self) -> &[u8] { @@ -77,9 +93,15 @@ impl ProtocolName for Message2Protocol { } } -impl ProtocolName for Message3Protocol { +impl ProtocolName for TransferProofProtocol { fn protocol_name(&self) -> &[u8] { - b"/xmr/btc/message3/1.0.0" + b"/xmr/btc/transfer_proof/1.0.0" + } +} + +impl ProtocolName for EncryptedSignatureProtocol { + fn protocol_name(&self) -> &[u8] { + b"/xmr/btc/encrypted_signature/1.0.0" } } @@ -171,3 +193,93 @@ where Ok(()) } } + +#[derive(Clone, Copy, Debug, Default)] +pub struct OneShotCodec

{ + phantom: PhantomData

, +} + +#[async_trait] +impl

RequestResponseCodec for OneShotCodec

+where + P: Send + Sync + Clone + ProtocolName, +{ + type Protocol = P; + type Request = Request; + type Response = Response; + + async fn read_request(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + debug!("enter read_request"); + let message = upgrade::read_one(io, BUF_SIZE).await.map_err(|e| match e { + ReadOneError::Io(err) => err, + e => io::Error::new(io::ErrorKind::Other, e), + })?; + let mut de = serde_cbor::Deserializer::from_slice(&message); + let msg = Request::deserialize(&mut de).map_err(|e| { + tracing::debug!("serde read_request error: {:?}", e); + io::Error::new(io::ErrorKind::Other, e) + })?; + + Ok(msg) + } + + async fn read_response( + &mut self, + _: &Self::Protocol, + io: &mut T, + ) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + debug!("enter read_response"); + let message = upgrade::read_one(io, BUF_SIZE) + .await + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + let mut de = serde_cbor::Deserializer::from_slice(&message); + let msg = Response::deserialize(&mut de).map_err(|e| { + tracing::debug!("serde read_response error: {:?}", e); + io::Error::new(io::ErrorKind::InvalidData, e) + })?; + + Ok(msg) + } + + async fn write_request( + &mut self, + _: &Self::Protocol, + io: &mut T, + req: Self::Request, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + let bytes = + serde_cbor::to_vec(&req).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + + upgrade::write_one(io, &bytes).await?; + + Ok(()) + } + + async fn write_response( + &mut self, + _: &Self::Protocol, + io: &mut T, + res: Self::Response, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + debug!("enter write_response"); + let bytes = serde_cbor::to_vec(&res).map_err(|e| { + tracing::debug!("serde write_reponse error: {:?}", e); + io::Error::new(io::ErrorKind::InvalidData, e) + })?; + upgrade::write_one(io, &bytes).await?; + + Ok(()) + } +} diff --git a/swap/src/protocol/alice.rs b/swap/src/protocol/alice.rs index 51c7134b..22af28df 100644 --- a/swap/src/protocol/alice.rs +++ b/swap/src/protocol/alice.rs @@ -4,10 +4,10 @@ pub use self::{ event_loop::{EventLoop, EventLoopHandle}, message0::Message0, message1::Message1, - message2::Message2, state::*, swap::{run, run_until}, swap_response::*, + transfer_proof::TransferProof, }; use crate::{ bitcoin, @@ -21,7 +21,7 @@ use crate::{ transport::build, Seed as NetworkSeed, }, - protocol::{bob, SwapAmounts}, + protocol::{bob, bob::EncryptedSignature, SwapAmounts}, seed::Seed, }; use anyhow::{bail, Result}; @@ -33,15 +33,16 @@ use std::{path::PathBuf, sync::Arc}; use tracing::{debug, info}; use uuid::Uuid; +mod encrypted_signature; pub mod event_loop; mod message0; mod message1; mod message2; -mod message3; pub mod state; mod steps; pub mod swap; mod swap_response; +mod transfer_proof; pub struct Swap { pub state: AliceState, @@ -232,10 +233,11 @@ pub enum OutEvent { channel: ResponseChannel, }, Message2 { - msg: bob::Message2, - channel: ResponseChannel, + msg: Box, + bob_peer_id: PeerId, }, - Message3(bob::Message3), + TransferProof, + EncryptedSignature(EncryptedSignature), } impl From for OutEvent { @@ -276,15 +278,26 @@ impl From for OutEvent { impl From for OutEvent { fn from(event: message2::OutEvent) -> Self { match event { - message2::OutEvent::Msg { msg, channel } => OutEvent::Message2 { msg, channel }, + message2::OutEvent::Msg { msg, bob_peer_id } => OutEvent::Message2 { + msg: Box::new(msg), + bob_peer_id, + }, } } } -impl From for OutEvent { - fn from(event: message3::OutEvent) -> Self { +impl From for OutEvent { + fn from(event: transfer_proof::OutEvent) -> Self { match event { - message3::OutEvent::Msg(msg) => OutEvent::Message3(msg), + transfer_proof::OutEvent::Msg => OutEvent::TransferProof, + } + } +} + +impl From for OutEvent { + fn from(event: encrypted_signature::OutEvent) -> Self { + match event { + encrypted_signature::OutEvent::Msg(msg) => OutEvent::EncryptedSignature(msg), } } } @@ -299,7 +312,8 @@ pub struct Behaviour { message0: message0::Behaviour, message1: message1::Behaviour, message2: message2::Behaviour, - message3: message3::Behaviour, + transfer_proof: transfer_proof::Behaviour, + encrypted_signature: encrypted_signature::Behaviour, } impl Behaviour { @@ -325,9 +339,9 @@ impl Behaviour { debug!("Sent Message1"); } - /// Send Message2 to Bob in response to receiving his Message2. - pub fn send_message2(&mut self, channel: ResponseChannel, msg: Message2) { - self.message2.send(channel, msg); - debug!("Sent Message2"); + /// Send Transfer Proof to Bob. + pub fn send_transfer_proof(&mut self, bob: PeerId, msg: TransferProof) { + self.transfer_proof.send(bob, msg); + debug!("Sent Transfer Proof"); } } diff --git a/swap/src/protocol/alice/message3.rs b/swap/src/protocol/alice/encrypted_signature.rs similarity index 67% rename from swap/src/protocol/alice/message3.rs rename to swap/src/protocol/alice/encrypted_signature.rs index 9c85cc96..ca1ea7f3 100644 --- a/swap/src/protocol/alice/message3.rs +++ b/swap/src/protocol/alice/encrypted_signature.rs @@ -1,6 +1,8 @@ use crate::{ - network::request_response::{AliceToBob, BobToAlice, Codec, Message3Protocol, TIMEOUT}, - protocol::bob, + network::request_response::{ + EncryptedSignatureProtocol, OneShotCodec, Request, Response, TIMEOUT, + }, + protocol::bob::EncryptedSignature, }; use libp2p::{ request_response::{ @@ -19,15 +21,16 @@ use tracing::{debug, error}; #[derive(Debug)] pub enum OutEvent { - Msg(bob::Message3), + Msg(EncryptedSignature), } -/// A `NetworkBehaviour` that represents receiving of message 3 from Bob. +/// A `NetworkBehaviour` that represents receiving the Bitcoin encrypted +/// signature from Bob. #[derive(NetworkBehaviour)] #[behaviour(out_event = "OutEvent", poll_method = "poll")] #[allow(missing_debug_implementations)] pub struct Behaviour { - rr: RequestResponse>, + rr: RequestResponse>, #[behaviour(ignore)] events: VecDeque, } @@ -37,7 +40,9 @@ impl Behaviour { &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll>, OutEvent>> { + ) -> Poll< + NetworkBehaviourAction>, OutEvent>, + > { if let Some(event) = self.events.pop_front() { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); } @@ -54,8 +59,8 @@ impl Default for Behaviour { Self { rr: RequestResponse::new( - Codec::default(), - vec![(Message3Protocol, ProtocolSupport::Full)], + OneShotCodec::default(), + vec![(EncryptedSignatureProtocol, ProtocolSupport::Inbound)], config, ), events: Default::default(), @@ -63,8 +68,8 @@ impl Default for Behaviour { } } -impl NetworkBehaviourEventProcess> for Behaviour { - fn inject_event(&mut self, event: RequestResponseEvent) { +impl NetworkBehaviourEventProcess> for Behaviour { + fn inject_event(&mut self, event: RequestResponseEvent) { match event { RequestResponseEvent::Message { message: @@ -73,11 +78,11 @@ impl NetworkBehaviourEventProcess> }, .. } => { - if let BobToAlice::Message3(msg) = request { - debug!("Received Message3"); - self.events.push_back(OutEvent::Msg(msg)); + if let Request::EncryptedSignature(msg) = request { + debug!("Received encrypted signature"); + self.events.push_back(OutEvent::Msg(*msg)); // Send back empty response so that the request/response protocol completes. - self.rr.send_response(channel, AliceToBob::Message3); + let _ = self.rr.send_response(channel, Response::EncryptedSignature); } } RequestResponseEvent::Message { diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index a648d1a6..b4168788 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -2,8 +2,9 @@ use crate::{ network::{request_response::AliceToBob, transport::SwapTransport, TokioExecutor}, protocol::{ alice, - alice::{Behaviour, OutEvent, SwapResponse}, + alice::{Behaviour, OutEvent, SwapResponse, TransferProof}, bob, + bob::EncryptedSignature, }, }; use anyhow::{anyhow, Context, Result}; @@ -12,6 +13,7 @@ use libp2p::{ core::Multiaddr, futures::StreamExt, request_response::ResponseChannel, PeerId, Swarm, }; use tokio::sync::mpsc::{Receiver, Sender}; +use tracing::trace; #[allow(missing_debug_implementations)] pub struct Channels { @@ -34,16 +36,16 @@ impl Default for Channels { #[derive(Debug)] pub struct EventLoopHandle { - msg0: Receiver<(bob::Message0, ResponseChannel)>, - msg1: Receiver<(bob::Message1, ResponseChannel)>, - msg2: Receiver<(bob::Message2, ResponseChannel)>, - msg3: Receiver, + recv_message0: Receiver<(bob::Message0, ResponseChannel)>, + recv_message1: Receiver<(bob::Message1, ResponseChannel)>, + recv_message2: Receiver, + recv_encrypted_signature: Receiver, request: Receiver, conn_established: Receiver, send_swap_response: Sender<(ResponseChannel, SwapResponse)>, - send_msg0: Sender<(ResponseChannel, alice::Message0)>, - send_msg1: Sender<(ResponseChannel, alice::Message1)>, - send_msg2: Sender<(ResponseChannel, alice::Message2)>, + send_message0: Sender<(ResponseChannel, alice::Message0)>, + send_message1: Sender<(ResponseChannel, alice::Message1)>, + send_transfer_proof: Sender<(PeerId, TransferProof)>, } impl EventLoopHandle { @@ -55,28 +57,28 @@ impl EventLoopHandle { } pub async fn recv_message0(&mut self) -> Result<(bob::Message0, ResponseChannel)> { - self.msg0 + self.recv_message0 .recv() .await .ok_or_else(|| anyhow!("Failed to receive message 0 from Bob")) } pub async fn recv_message1(&mut self) -> Result<(bob::Message1, ResponseChannel)> { - self.msg1 + self.recv_message1 .recv() .await .ok_or_else(|| anyhow!("Failed to receive message 1 from Bob")) } - pub async fn recv_message2(&mut self) -> Result<(bob::Message2, ResponseChannel)> { - self.msg2 + pub async fn recv_message2(&mut self) -> Result { + self.recv_message2 .recv() .await - .ok_or_else(|| anyhow!("Failed o receive message 2 from Bob")) + .ok_or_else(|| anyhow!("Failed to receive message 2 from Bob")) } - pub async fn recv_message3(&mut self) -> Result { - self.msg3 + pub async fn recv_encrypted_signature(&mut self) -> Result { + self.recv_encrypted_signature .recv() .await .ok_or_else(|| anyhow!("Failed to receive Bitcoin encrypted signature from Bob")) @@ -108,7 +110,7 @@ impl EventLoopHandle { channel: ResponseChannel, msg: alice::Message0, ) -> Result<()> { - let _ = self.send_msg0.send((channel, msg)).await?; + let _ = self.send_message0.send((channel, msg)).await?; Ok(()) } @@ -117,16 +119,12 @@ impl EventLoopHandle { channel: ResponseChannel, msg: alice::Message1, ) -> Result<()> { - let _ = self.send_msg1.send((channel, msg)).await?; + let _ = self.send_message1.send((channel, msg)).await?; Ok(()) } - pub async fn send_message2( - &mut self, - channel: ResponseChannel, - msg: alice::Message2, - ) -> Result<()> { - let _ = self.send_msg2.send((channel, msg)).await?; + pub async fn send_transfer_proof(&mut self, bob: PeerId, msg: TransferProof) -> Result<()> { + let _ = self.send_transfer_proof.send((bob, msg)).await?; Ok(()) } } @@ -134,16 +132,16 @@ impl EventLoopHandle { #[allow(missing_debug_implementations)] pub struct EventLoop { swarm: libp2p::Swarm, - msg0: Sender<(bob::Message0, ResponseChannel)>, - msg1: Sender<(bob::Message1, ResponseChannel)>, - msg2: Sender<(bob::Message2, ResponseChannel)>, - msg3: Sender, + recv_message0: Sender<(bob::Message0, ResponseChannel)>, + recv_message1: Sender<(bob::Message1, ResponseChannel)>, + recv_message2: Sender, + recv_encrypted_signature: Sender, request: Sender, conn_established: Sender, send_swap_response: Receiver<(ResponseChannel, SwapResponse)>, - send_msg0: Receiver<(ResponseChannel, alice::Message0)>, - send_msg1: Receiver<(ResponseChannel, alice::Message1)>, - send_msg2: Receiver<(ResponseChannel, alice::Message2)>, + send_message0: Receiver<(ResponseChannel, alice::Message0)>, + send_message1: Receiver<(ResponseChannel, alice::Message1)>, + send_transfer_proof: Receiver<(PeerId, TransferProof)>, } impl EventLoop { @@ -162,42 +160,42 @@ impl EventLoop { Swarm::listen_on(&mut swarm, listen.clone()) .with_context(|| format!("Address is not supported: {:#}", listen))?; - let msg0 = Channels::new(); - let msg1 = Channels::new(); - let msg2 = Channels::new(); - let msg3 = Channels::new(); + let recv_message0 = Channels::new(); + let recv_message1 = Channels::new(); + let recv_message2 = Channels::new(); + let recv_encrypted_signature = Channels::new(); let request = Channels::new(); let conn_established = Channels::new(); let send_swap_response = Channels::new(); - let send_msg0 = Channels::new(); - let send_msg1 = Channels::new(); - let send_msg2 = Channels::new(); + let send_message0 = Channels::new(); + let send_message1 = Channels::new(); + let send_transfer_proof = Channels::new(); let driver = EventLoop { swarm, - msg0: msg0.sender, - msg1: msg1.sender, - msg2: msg2.sender, - msg3: msg3.sender, + recv_message0: recv_message0.sender, + recv_message1: recv_message1.sender, + recv_message2: recv_message2.sender, + recv_encrypted_signature: recv_encrypted_signature.sender, request: request.sender, conn_established: conn_established.sender, send_swap_response: send_swap_response.receiver, - send_msg0: send_msg0.receiver, - send_msg1: send_msg1.receiver, - send_msg2: send_msg2.receiver, + send_message0: send_message0.receiver, + send_message1: send_message1.receiver, + send_transfer_proof: send_transfer_proof.receiver, }; let handle = EventLoopHandle { - msg0: msg0.receiver, - msg1: msg1.receiver, - msg2: msg2.receiver, - msg3: msg3.receiver, + recv_message0: recv_message0.receiver, + recv_message1: recv_message1.receiver, + recv_message2: recv_message2.receiver, + recv_encrypted_signature: recv_encrypted_signature.receiver, request: request.receiver, conn_established: conn_established.receiver, send_swap_response: send_swap_response.sender, - send_msg0: send_msg0.sender, - send_msg1: send_msg1.sender, - send_msg2: send_msg2.sender, + send_message0: send_message0.sender, + send_message1: send_message1.sender, + send_transfer_proof: send_transfer_proof.sender, }; Ok((driver, handle)) @@ -212,16 +210,17 @@ impl EventLoop { let _ = self.conn_established.send(alice).await; } OutEvent::Message0 { msg, channel } => { - let _ = self.msg0.send((*msg, channel)).await; + let _ = self.recv_message0.send((*msg, channel)).await; } OutEvent::Message1 { msg, channel } => { - let _ = self.msg1.send((msg, channel)).await; + let _ = self.recv_message1.send((msg, channel)).await; } - OutEvent::Message2 { msg, channel } => { - let _ = self.msg2.send((msg, channel)).await; + OutEvent::Message2 { msg, bob_peer_id : _} => { + let _ = self.recv_message2.send(*msg).await; } - OutEvent::Message3(msg) => { - let _ = self.msg3.send(msg).await; + OutEvent::TransferProof => trace!("Bob ack'd receiving the transfer proof"), + OutEvent::EncryptedSignature(msg) => { + let _ = self.recv_encrypted_signature.send(msg).await; } OutEvent::Request(event) => { let _ = self.request.send(*event).await; @@ -233,19 +232,19 @@ impl EventLoop { self.swarm.send_swap_response(channel, swap_response); } }, - msg0 = self.send_msg0.next().fuse() => { + msg0 = self.send_message0.next().fuse() => { if let Some((channel, msg)) = msg0 { self.swarm.send_message0(channel, msg); } }, - msg1 = self.send_msg1.next().fuse() => { + msg1 = self.send_message1.next().fuse() => { if let Some((channel, msg)) = msg1 { self.swarm.send_message1(channel, msg); } }, - msg2 = self.send_msg2.next().fuse() => { - if let Some((channel, msg)) = msg2 { - self.swarm.send_message2(channel, msg); + transfer_proof = self.send_transfer_proof.next().fuse() => { + if let Some((bob_peer_id, msg)) = transfer_proof { + self.swarm.send_transfer_proof(bob_peer_id, msg); } }, } diff --git a/swap/src/protocol/alice/message1.rs b/swap/src/protocol/alice/message1.rs index 95050905..497c8625 100644 --- a/swap/src/protocol/alice/message1.rs +++ b/swap/src/protocol/alice/message1.rs @@ -93,7 +93,7 @@ impl NetworkBehaviourEventProcess> } => { if let BobToAlice::Message1(msg) = request { debug!("Received Message1"); - self.events.push_back(OutEvent::Msg { msg, channel }); + self.events.push_back(OutEvent::Msg { msg: *msg, channel }); } } RequestResponseEvent::Message { diff --git a/swap/src/protocol/alice/message2.rs b/swap/src/protocol/alice/message2.rs index d383ff11..306b9be5 100644 --- a/swap/src/protocol/alice/message2.rs +++ b/swap/src/protocol/alice/message2.rs @@ -1,17 +1,15 @@ use crate::{ - monero, network::request_response::{AliceToBob, BobToAlice, Codec, Message2Protocol, TIMEOUT}, protocol::bob, }; use libp2p::{ request_response::{ handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig, - RequestResponseEvent, RequestResponseMessage, ResponseChannel, + RequestResponseEvent, RequestResponseMessage, }, swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}, - NetworkBehaviour, + NetworkBehaviour, PeerId, }; -use serde::{Deserialize, Serialize}; use std::{ collections::VecDeque, task::{Context, Poll}, @@ -22,18 +20,11 @@ use tracing::{debug, error}; #[derive(Debug)] pub enum OutEvent { Msg { - /// Received message from Bob. msg: bob::Message2, - /// Channel to send back Alice's message 2. - channel: ResponseChannel, + bob_peer_id: PeerId, }, } -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct Message2 { - pub tx_lock_proof: monero::TransferProof, -} - /// A `NetworkBehaviour` that represents receiving of message 2 from Bob. #[derive(NetworkBehaviour)] #[behaviour(out_event = "OutEvent", poll_method = "poll")] @@ -45,11 +36,6 @@ pub struct Behaviour { } impl Behaviour { - pub fn send(&mut self, channel: ResponseChannel, msg: Message2) { - let msg = AliceToBob::Message2(msg); - self.rr.send_response(channel, msg); - } - fn poll( &mut self, _: &mut Context<'_>, @@ -84,15 +70,20 @@ impl NetworkBehaviourEventProcess> fn inject_event(&mut self, event: RequestResponseEvent) { match event { RequestResponseEvent::Message { + peer, message: RequestResponseMessage::Request { request, channel, .. }, - .. } => { if let BobToAlice::Message2(msg) = request { - debug!("Received Message2"); - self.events.push_back(OutEvent::Msg { msg, channel }); + debug!("Received Message 2"); + self.events.push_back(OutEvent::Msg { + msg: *msg, + bob_peer_id: peer, + }); + // Send back empty response so that the request/response protocol completes. + let _ = self.rr.send_response(channel, AliceToBob::Message2); } } RequestResponseEvent::Message { diff --git a/swap/src/protocol/alice/state.rs b/swap/src/protocol/alice/state.rs index e838568f..c88e23c0 100644 --- a/swap/src/protocol/alice/state.rs +++ b/swap/src/protocol/alice/state.rs @@ -8,15 +8,11 @@ use crate::{ }, monero, monero::CreateWalletForOutput, - network::request_response::AliceToBob, - protocol::{alice, bob, SwapAmounts}, + protocol::{alice, alice::TransferProof, bob, bob::EncryptedSignature, SwapAmounts}, }; use anyhow::{anyhow, Context, Result}; -use ecdsa_fun::{ - adaptor::{Adaptor, EncryptedSignature}, - nonce::Deterministic, -}; -use libp2p::request_response::ResponseChannel; +use ecdsa_fun::{adaptor::Adaptor, nonce::Deterministic}; +use libp2p::PeerId; use rand::{CryptoRng, RngCore}; use serde::{Deserialize, Serialize}; use sha2::Sha256; @@ -30,12 +26,12 @@ pub enum AliceState { state0: State0, }, Negotiated { - channel: Option>, + bob_peer_id: PeerId, amounts: SwapAmounts, state3: Box, }, BtcLocked { - channel: Option>, + bob_peer_id: PeerId, amounts: SwapAmounts, state3: Box, }, @@ -43,7 +39,7 @@ pub enum AliceState { state3: Box, }, EncSigLearned { - encrypted_signature: EncryptedSignature, + encrypted_signature: bitcoin::EncryptedSignature, state3: Box, }, BtcRedeemed, @@ -476,13 +472,13 @@ pub struct State5 { } impl State5 { - pub fn next_message(&self) -> alice::Message2 { - alice::Message2 { + pub fn next_message(&self) -> TransferProof { + TransferProof { tx_lock_proof: self.tx_lock_proof.clone(), } } - pub fn receive(self, msg: bob::Message3) -> State6 { + pub fn receive(self, msg: EncryptedSignature) -> State6 { State6 { a: self.a, B: self.B, @@ -554,7 +550,7 @@ pub struct State6 { tx_lock: bitcoin::TxLock, tx_punish_sig_bob: bitcoin::Signature, - tx_redeem_encsig: EncryptedSignature, + tx_redeem_encsig: bitcoin::EncryptedSignature, lock_xmr_fee: monero::Amount, } diff --git a/swap/src/protocol/alice/steps.rs b/swap/src/protocol/alice/steps.rs index d6be5ae8..37de2b34 100644 --- a/swap/src/protocol/alice/steps.rs +++ b/swap/src/protocol/alice/steps.rs @@ -10,10 +10,9 @@ use crate::{ config::Config, monero, monero::Transfer, - network::request_response::AliceToBob, protocol::{ alice, - alice::{event_loop::EventLoopHandle, SwapResponse}, + alice::{event_loop::EventLoopHandle, SwapResponse, TransferProof}, SwapAmounts, }, }; @@ -23,7 +22,7 @@ use futures::{ future::{select, Either}, pin_mut, }; -use libp2p::request_response::ResponseChannel; +use libp2p::PeerId; use rand::rngs::OsRng; use sha2::Sha256; use std::sync::Arc; @@ -35,11 +34,11 @@ pub async fn negotiate( xmr_amount: monero::Amount, event_loop_handle: &mut EventLoopHandle, config: Config, -) -> Result<(ResponseChannel, alice::State3)> { +) -> Result<(PeerId, alice::State3)> { trace!("Starting negotiate"); // todo: we can move this out, we dont need to timeout here - let _peer_id = timeout( + let bob_peer_id = timeout( config.bob_time_to_act, event_loop_handle.recv_conn_established(), ) @@ -73,12 +72,11 @@ pub async fn negotiate( .send_message1(channel, state2.next_message()) .await?; - let (bob_message2, channel) = - timeout(config.bob_time_to_act, event_loop_handle.recv_message2()).await??; + let bob_message2 = timeout(config.bob_time_to_act, event_loop_handle.recv_message2()).await??; let state3 = state2.receive(bob_message2)?; - Ok((channel, state3)) + Ok((bob_peer_id, state3)) } // TODO(Franck): Use helper functions from xmr-btc instead of re-writing them @@ -108,7 +106,7 @@ where } pub async fn lock_xmr( - channel: ResponseChannel, + bob_peer_id: PeerId, amounts: SwapAmounts, state3: alice::State3, event_loop_handle: &mut EventLoopHandle, @@ -134,7 +132,7 @@ where // Otherwise Alice might publish the lock tx twice! event_loop_handle - .send_message2(channel, alice::Message2 { + .send_transfer_proof(bob_peer_id, TransferProof { tx_lock_proof: transfer_proof, }) .await?; @@ -146,7 +144,7 @@ pub async fn wait_for_bitcoin_encrypted_signature( event_loop_handle: &mut EventLoopHandle, ) -> Result { let msg3 = event_loop_handle - .recv_message3() + .recv_encrypted_signature() .await .context("Failed to receive Bitcoin encrypted signature from Bob")?; diff --git a/swap/src/protocol/alice/swap.rs b/swap/src/protocol/alice/swap.rs index ecda7c63..2abe8e67 100644 --- a/swap/src/protocol/alice/swap.rs +++ b/swap/src/protocol/alice/swap.rs @@ -91,11 +91,11 @@ async fn run_until_internal( } else { match state { AliceState::Started { amounts, state0 } => { - let (channel, state3) = + let (bob_peer_id, state3) = negotiate(state0, amounts.xmr, &mut event_loop_handle, config).await?; let state = AliceState::Negotiated { - channel: Some(channel), + bob_peer_id, amounts, state3: Box::new(state3), }; @@ -117,30 +117,17 @@ async fn run_until_internal( } AliceState::Negotiated { state3, - channel, + bob_peer_id, amounts, } => { - let state = match channel { - Some(channel) => { - let _ = wait_for_locked_bitcoin( - state3.tx_lock.txid(), - bitcoin_wallet.clone(), - config, - ) + let _ = + wait_for_locked_bitcoin(state3.tx_lock.txid(), bitcoin_wallet.clone(), config) .await?; - AliceState::BtcLocked { - channel: Some(channel), - amounts, - state3, - } - } - None => { - tracing::info!("Cannot resume swap from negotiated state, aborting"); - - // Alice did not lock Xmr yet - AliceState::SafelyAborted - } + let state = AliceState::BtcLocked { + bob_peer_id, + amounts, + state3, }; let db_state = (&state).into(); @@ -159,30 +146,20 @@ async fn run_until_internal( .await } AliceState::BtcLocked { - channel, + bob_peer_id, amounts, state3, } => { - let state = match channel { - Some(channel) => { - lock_xmr( - channel, - amounts, - *state3.clone(), - &mut event_loop_handle, - monero_wallet.clone(), - ) - .await?; + lock_xmr( + bob_peer_id, + amounts, + *state3.clone(), + &mut event_loop_handle, + monero_wallet.clone(), + ) + .await?; - AliceState::XmrLocked { state3 } - } - None => { - tracing::info!("Cannot resume swap from BTC locked state, aborting"); - - // Alice did not lock Xmr yet - AliceState::SafelyAborted - } - }; + let state = AliceState::XmrLocked { state3 }; let db_state = (&state).into(); db.insert_latest_state(swap_id, database::Swap::Alice(db_state)) diff --git a/swap/src/protocol/alice/swap_response.rs b/swap/src/protocol/alice/swap_response.rs index 47f6ceab..031bd8dd 100644 --- a/swap/src/protocol/alice/swap_response.rs +++ b/swap/src/protocol/alice/swap_response.rs @@ -45,7 +45,7 @@ pub struct Behaviour { impl Behaviour { /// Alice always sends her messages as a response to a request from Bob. pub fn send(&mut self, channel: ResponseChannel, msg: SwapResponse) { - let msg = AliceToBob::SwapResponse(msg); + let msg = AliceToBob::SwapResponse(Box::new(msg)); self.rr.send_response(channel, msg); } @@ -92,7 +92,7 @@ impl NetworkBehaviourEventProcess> } => { if let BobToAlice::SwapRequest(msg) = request { debug!("Received swap request"); - self.events.push_back(OutEvent { msg, channel }) + self.events.push_back(OutEvent { msg: *msg, channel }) } } RequestResponseEvent::Message { diff --git a/swap/src/protocol/alice/transfer_proof.rs b/swap/src/protocol/alice/transfer_proof.rs new file mode 100644 index 00000000..b9f379a9 --- /dev/null +++ b/swap/src/protocol/alice/transfer_proof.rs @@ -0,0 +1,102 @@ +use crate::{ + monero, + network::request_response::{OneShotCodec, Request, Response, TransferProofProtocol, TIMEOUT}, +}; +use libp2p::{ + request_response::{ + handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig, + RequestResponseEvent, RequestResponseMessage, + }, + swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}, + NetworkBehaviour, PeerId, +}; +use serde::{Deserialize, Serialize}; +use std::{ + collections::VecDeque, + task::{Context, Poll}, + time::Duration, +}; +use tracing::error; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct TransferProof { + pub tx_lock_proof: monero::TransferProof, +} + +#[derive(Debug, Copy, Clone)] +pub enum OutEvent { + Msg, +} + +/// A `NetworkBehaviour` that represents sending the Monero transfer proof to +/// Bob. +#[derive(NetworkBehaviour)] +#[behaviour(out_event = "OutEvent", poll_method = "poll")] +#[allow(missing_debug_implementations)] +pub struct Behaviour { + rr: RequestResponse>, + #[behaviour(ignore)] + events: VecDeque, +} + +impl Behaviour { + pub fn send(&mut self, bob: PeerId, msg: TransferProof) { + let msg = Request::TransferProof(Box::new(msg)); + let _id = self.rr.send_request(&bob, msg); + } + + fn poll( + &mut self, + _: &mut Context<'_>, + _: &mut impl PollParameters, + ) -> Poll>, OutEvent>> + { + if let Some(event) = self.events.pop_front() { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + } + + Poll::Pending + } +} + +impl Default for Behaviour { + fn default() -> Self { + let timeout = Duration::from_secs(TIMEOUT); + let mut config = RequestResponseConfig::default(); + config.set_request_timeout(timeout); + + Self { + rr: RequestResponse::new( + OneShotCodec::default(), + vec![(TransferProofProtocol, ProtocolSupport::Outbound)], + config, + ), + events: Default::default(), + } + } +} + +impl NetworkBehaviourEventProcess> for Behaviour { + fn inject_event(&mut self, event: RequestResponseEvent) { + match event { + RequestResponseEvent::Message { + message: RequestResponseMessage::Request { .. }, + .. + } => panic!("Alice should never get a transfer proof request from Bob"), + RequestResponseEvent::Message { + message: RequestResponseMessage::Response { response, .. }, + .. + } => { + if let Response::TransferProof = response { + self.events.push_back(OutEvent::Msg); + } + } + RequestResponseEvent::InboundFailure { error, .. } => { + error!("Inbound failure: {:?}", error); + } + RequestResponseEvent::OutboundFailure { error, .. } => { + error!("Outbound failure: {:?}", error); + } + } + } +} diff --git a/swap/src/protocol/bob.rs b/swap/src/protocol/bob.rs index 20e8d2a2..f4ca2e52 100644 --- a/swap/src/protocol/bob.rs +++ b/swap/src/protocol/bob.rs @@ -2,7 +2,6 @@ //! Bob holds BTC and wishes receive XMR. use crate::{ bitcoin, - bitcoin::EncryptedSignature, config::Config, database, database::Database, @@ -22,24 +21,26 @@ use tracing::{debug, info}; use uuid::Uuid; pub use self::{ + encrypted_signature::EncryptedSignature, event_loop::{EventLoop, EventLoopHandle}, message0::Message0, message1::Message1, message2::Message2, - message3::Message3, state::*, swap::{run, run_until}, swap_request::*, }; +use crate::protocol::alice::TransferProof; +mod encrypted_signature; pub mod event_loop; mod message0; mod message1; mod message2; -mod message3; pub mod state; pub mod swap; mod swap_request; +mod transfer_proof; pub struct Swap { pub state: BobState, @@ -210,8 +211,9 @@ pub enum OutEvent { SwapResponse(alice::SwapResponse), Message0(Box), Message1(Box), - Message2(alice::Message2), - Message3, + Message2, + TransferProof(Box), + EncryptedSignature, } impl From for OutEvent { @@ -249,15 +251,23 @@ impl From for OutEvent { impl From for OutEvent { fn from(event: message2::OutEvent) -> Self { match event { - message2::OutEvent::Msg(msg) => OutEvent::Message2(msg), + message2::OutEvent::Msg => OutEvent::Message2, } } } -impl From for OutEvent { - fn from(event: message3::OutEvent) -> Self { +impl From for OutEvent { + fn from(event: transfer_proof::OutEvent) -> Self { match event { - message3::OutEvent::Msg => OutEvent::Message3, + transfer_proof::OutEvent::Msg(msg) => OutEvent::TransferProof(Box::new(msg)), + } + } +} + +impl From for OutEvent { + fn from(event: encrypted_signature::OutEvent) -> Self { + match event { + encrypted_signature::OutEvent::Msg => OutEvent::EncryptedSignature, } } } @@ -272,7 +282,8 @@ pub struct Behaviour { message0: message0::Behaviour, message1: message1::Behaviour, message2: message2::Behaviour, - message3: message3::Behaviour, + transfer_proof: transfer_proof::Behaviour, + encrypted_signature: encrypted_signature::Behaviour, } impl Behaviour { @@ -301,9 +312,13 @@ impl Behaviour { } /// Sends Bob's fourth message to Alice. - pub fn send_message3(&mut self, alice: PeerId, tx_redeem_encsig: EncryptedSignature) { - let msg = bob::Message3 { tx_redeem_encsig }; - self.message3.send(alice, msg); + pub fn send_encrypted_signature( + &mut self, + alice: PeerId, + tx_redeem_encsig: bitcoin::EncryptedSignature, + ) { + let msg = EncryptedSignature { tx_redeem_encsig }; + self.encrypted_signature.send(alice, msg); debug!("Sent Message3"); } diff --git a/swap/src/protocol/bob/message3.rs b/swap/src/protocol/bob/encrypted_signature.rs similarity index 70% rename from swap/src/protocol/bob/message3.rs rename to swap/src/protocol/bob/encrypted_signature.rs index f9703ad1..b3d2815a 100644 --- a/swap/src/protocol/bob/message3.rs +++ b/swap/src/protocol/bob/encrypted_signature.rs @@ -1,6 +1,5 @@ -use crate::{ - bitcoin::EncryptedSignature, - network::request_response::{AliceToBob, BobToAlice, Codec, Message3Protocol, TIMEOUT}, +use crate::network::request_response::{ + EncryptedSignatureProtocol, OneShotCodec, Request, Response, TIMEOUT, }; use libp2p::{ request_response::{ @@ -19,8 +18,8 @@ use std::{ use tracing::error; #[derive(Clone, Debug, Serialize, Deserialize)] -pub struct Message3 { - pub tx_redeem_encsig: EncryptedSignature, +pub struct EncryptedSignature { + pub tx_redeem_encsig: crate::bitcoin::EncryptedSignature, } #[derive(Debug, Copy, Clone)] @@ -28,19 +27,19 @@ pub enum OutEvent { Msg, } -/// A `NetworkBehaviour` that represents sending message 3 to Alice. +/// A `NetworkBehaviour` that represents sending encrypted signature to Alice. #[derive(NetworkBehaviour)] #[behaviour(out_event = "OutEvent", poll_method = "poll")] #[allow(missing_debug_implementations)] pub struct Behaviour { - rr: RequestResponse>, + rr: RequestResponse>, #[behaviour(ignore)] events: VecDeque, } impl Behaviour { - pub fn send(&mut self, alice: PeerId, msg: Message3) { - let msg = BobToAlice::Message3(msg); + pub fn send(&mut self, alice: PeerId, msg: EncryptedSignature) { + let msg = Request::EncryptedSignature(Box::new(msg)); let _id = self.rr.send_request(&alice, msg); } @@ -48,7 +47,9 @@ impl Behaviour { &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll>, OutEvent>> { + ) -> Poll< + NetworkBehaviourAction>, OutEvent>, + > { if let Some(event) = self.events.pop_front() { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); } @@ -65,8 +66,8 @@ impl Default for Behaviour { Self { rr: RequestResponse::new( - Codec::default(), - vec![(Message3Protocol, ProtocolSupport::Full)], + OneShotCodec::default(), + vec![(EncryptedSignatureProtocol, ProtocolSupport::Outbound)], config, ), events: Default::default(), @@ -74,8 +75,8 @@ impl Default for Behaviour { } } -impl NetworkBehaviourEventProcess> for Behaviour { - fn inject_event(&mut self, event: RequestResponseEvent) { +impl NetworkBehaviourEventProcess> for Behaviour { + fn inject_event(&mut self, event: RequestResponseEvent) { match event { RequestResponseEvent::Message { message: RequestResponseMessage::Request { .. }, @@ -85,7 +86,7 @@ impl NetworkBehaviourEventProcess> message: RequestResponseMessage::Response { response, .. }, .. } => { - if let AliceToBob::Message3 = response { + if let Response::EncryptedSignature = response { self.events.push_back(OutEvent::Msg); } } diff --git a/swap/src/protocol/bob/event_loop.rs b/swap/src/protocol/bob/event_loop.rs index 99130973..eb117432 100644 --- a/swap/src/protocol/bob/event_loop.rs +++ b/swap/src/protocol/bob/event_loop.rs @@ -3,7 +3,7 @@ use crate::{ network::{transport::SwapTransport, TokioExecutor}, protocol::{ alice, - alice::SwapResponse, + alice::{SwapResponse, TransferProof}, bob::{self, Behaviour, OutEvent, SwapRequest}, }, }; @@ -37,46 +37,46 @@ impl Default for Channels { #[derive(Debug)] pub struct EventLoopHandle { - swap_response: Receiver, - msg0: Receiver, - msg1: Receiver, - msg2: Receiver, + recv_swap_response: Receiver, + recv_message0: Receiver, + recv_message1: Receiver, + recv_transfer_proof: Receiver, conn_established: Receiver, dial_alice: Sender<()>, send_swap_request: Sender, - send_msg0: Sender, - send_msg1: Sender, - send_msg2: Sender, - send_msg3: Sender, + send_message0: Sender, + send_message1: Sender, + send_message2: Sender, + send_encrypted_signature: Sender, } impl EventLoopHandle { pub async fn recv_swap_response(&mut self) -> Result { - self.swap_response + self.recv_swap_response .recv() .await .ok_or_else(|| anyhow!("Failed to receive swap response from Alice")) } pub async fn recv_message0(&mut self) -> Result { - self.msg0 + self.recv_message0 .recv() .await .ok_or_else(|| anyhow!("Failed to receive message 0 from Alice")) } pub async fn recv_message1(&mut self) -> Result { - self.msg1 + self.recv_message1 .recv() .await .ok_or_else(|| anyhow!("Failed to receive message 1 from Alice")) } - pub async fn recv_message2(&mut self) -> Result { - self.msg2 + pub async fn recv_transfer_proof(&mut self) -> Result { + self.recv_transfer_proof .recv() .await - .ok_or_else(|| anyhow!("Failed o receive message 2 from Alice")) + .ok_or_else(|| anyhow!("Failed to receive transfer proof from Alice")) } /// Dials other party and wait for the connection to be established. @@ -99,22 +99,25 @@ impl EventLoopHandle { } pub async fn send_message0(&mut self, msg: bob::Message0) -> Result<()> { - let _ = self.send_msg0.send(msg).await?; + let _ = self.send_message0.send(msg).await?; Ok(()) } pub async fn send_message1(&mut self, msg: bob::Message1) -> Result<()> { - let _ = self.send_msg1.send(msg).await?; + let _ = self.send_message1.send(msg).await?; Ok(()) } pub async fn send_message2(&mut self, msg: bob::Message2) -> Result<()> { - let _ = self.send_msg2.send(msg).await?; + let _ = self.send_message2.send(msg).await?; Ok(()) } - pub async fn send_message3(&mut self, tx_redeem_encsig: EncryptedSignature) -> Result<()> { - let _ = self.send_msg3.send(tx_redeem_encsig).await?; + pub async fn send_encrypted_signature( + &mut self, + tx_redeem_encsig: EncryptedSignature, + ) -> Result<()> { + let _ = self.send_encrypted_signature.send(tx_redeem_encsig).await?; Ok(()) } } @@ -123,17 +126,17 @@ impl EventLoopHandle { pub struct EventLoop { swarm: libp2p::Swarm, alice_peer_id: PeerId, - swap_response: Sender, - msg0: Sender, - msg1: Sender, - msg2: Sender, - conn_established: Sender, + recv_swap_response: Sender, + recv_message0: Sender, + recv_message1: Sender, + recv_transfer_proof: Sender, dial_alice: Receiver<()>, + conn_established: Sender, send_swap_request: Receiver, - send_msg0: Receiver, - send_msg1: Receiver, - send_msg2: Receiver, - send_msg3: Receiver, + send_message0: Receiver, + send_message1: Receiver, + send_message2: Receiver, + send_encrypted_signature: Receiver, } impl EventLoop { @@ -153,45 +156,45 @@ impl EventLoop { swarm.add_address(alice_peer_id.clone(), alice_addr); let swap_response = Channels::new(); - let msg0 = Channels::new(); - let msg1 = Channels::new(); - let msg2 = Channels::new(); - let conn_established = Channels::new(); + let recv_message0 = Channels::new(); + let recv_message1 = Channels::new(); + let recv_transfer_proof = Channels::new(); let dial_alice = Channels::new(); + let conn_established = Channels::new(); let send_swap_request = Channels::new(); - let send_msg0 = Channels::new(); - let send_msg1 = Channels::new(); - let send_msg2 = Channels::new(); - let send_msg3 = Channels::new(); + let send_message0 = Channels::new(); + let send_message1 = Channels::new(); + let send_message2 = Channels::new(); + let send_encrypted_signature = Channels::new(); let event_loop = EventLoop { swarm, alice_peer_id, - swap_response: swap_response.sender, - msg0: msg0.sender, - msg1: msg1.sender, - msg2: msg2.sender, + recv_swap_response: swap_response.sender, + recv_message0: recv_message0.sender, + recv_message1: recv_message1.sender, + recv_transfer_proof: recv_transfer_proof.sender, conn_established: conn_established.sender, dial_alice: dial_alice.receiver, send_swap_request: send_swap_request.receiver, - send_msg0: send_msg0.receiver, - send_msg1: send_msg1.receiver, - send_msg2: send_msg2.receiver, - send_msg3: send_msg3.receiver, + send_message0: send_message0.receiver, + send_message1: send_message1.receiver, + send_message2: send_message2.receiver, + send_encrypted_signature: send_encrypted_signature.receiver, }; let handle = EventLoopHandle { - swap_response: swap_response.receiver, - msg0: msg0.receiver, - msg1: msg1.receiver, - msg2: msg2.receiver, + recv_swap_response: swap_response.receiver, + recv_message0: recv_message0.receiver, + recv_message1: recv_message1.receiver, + recv_transfer_proof: recv_transfer_proof.receiver, conn_established: conn_established.receiver, dial_alice: dial_alice.sender, send_swap_request: send_swap_request.sender, - send_msg0: send_msg0.sender, - send_msg1: send_msg1.sender, - send_msg2: send_msg2.sender, - send_msg3: send_msg3.sender, + send_message0: send_message0.sender, + send_message1: send_message1.sender, + send_message2: send_message2.sender, + send_encrypted_signature: send_encrypted_signature.sender, }; Ok((event_loop, handle)) @@ -206,18 +209,19 @@ impl EventLoop { let _ = self.conn_established.send(peer_id).await; } OutEvent::SwapResponse(msg) => { - let _ = self.swap_response.send(msg).await; + let _ = self.recv_swap_response.send(msg).await; }, OutEvent::Message0(msg) => { - let _ = self.msg0.send(*msg).await; + let _ = self.recv_message0.send(*msg).await; } OutEvent::Message1(msg) => { - let _ = self.msg1.send(*msg).await; + let _ = self.recv_message1.send(*msg).await; } - OutEvent::Message2(msg) => { - let _ = self.msg2.send(msg).await; + OutEvent::Message2 => info!("Alice acknowledged message 2 received"), + OutEvent::TransferProof(msg) => { + let _ = self.recv_transfer_proof.send(*msg).await; } - OutEvent::Message3 => info!("Alice acknowledged message 3 received"), + OutEvent::EncryptedSignature => info!("Alice acknowledged encrypted signature received"), } }, option = self.dial_alice.next().fuse() => { @@ -242,25 +246,25 @@ impl EventLoop { } }, - msg0 = self.send_msg0.next().fuse() => { + msg0 = self.send_message0.next().fuse() => { if let Some(msg) = msg0 { self.swarm.send_message0(self.alice_peer_id.clone(), msg); } } - msg1 = self.send_msg1.next().fuse() => { + msg1 = self.send_message1.next().fuse() => { if let Some(msg) = msg1 { self.swarm.send_message1(self.alice_peer_id.clone(), msg); } }, - msg2 = self.send_msg2.next().fuse() => { + msg2 = self.send_message2.next().fuse() => { if let Some(msg) = msg2 { self.swarm.send_message2(self.alice_peer_id.clone(), msg); } }, - msg3 = self.send_msg3.next().fuse() => { - if let Some(tx_redeem_encsig) = msg3 { - self.swarm.send_message3(self.alice_peer_id.clone(), tx_redeem_encsig); + encrypted_signature = self.send_encrypted_signature.next().fuse() => { + if let Some(tx_redeem_encsig) = encrypted_signature { + self.swarm.send_encrypted_signature(self.alice_peer_id.clone(), tx_redeem_encsig); } } } diff --git a/swap/src/protocol/bob/message1.rs b/swap/src/protocol/bob/message1.rs index f783af1d..dea01ae3 100644 --- a/swap/src/protocol/bob/message1.rs +++ b/swap/src/protocol/bob/message1.rs @@ -41,7 +41,7 @@ pub struct Behaviour { impl Behaviour { pub fn send(&mut self, alice: PeerId, msg: Message1) { - let msg = BobToAlice::Message1(msg); + let msg = BobToAlice::Message1(Box::new(msg)); let _id = self.rr.send_request(&alice, msg); } diff --git a/swap/src/protocol/bob/message2.rs b/swap/src/protocol/bob/message2.rs index edda820e..20333d95 100644 --- a/swap/src/protocol/bob/message2.rs +++ b/swap/src/protocol/bob/message2.rs @@ -1,7 +1,4 @@ -use crate::{ - network::request_response::{AliceToBob, BobToAlice, Codec, Message2Protocol, TIMEOUT}, - protocol::alice, -}; +use crate::network::request_response::{AliceToBob, BobToAlice, Codec, Message2Protocol, TIMEOUT}; use ecdsa_fun::Signature; use libp2p::{ request_response::{ @@ -25,9 +22,9 @@ pub struct Message2 { pub(crate) tx_cancel_sig: Signature, } -#[derive(Debug)] +#[derive(Clone, Copy, Debug)] pub enum OutEvent { - Msg(alice::Message2), + Msg, } /// A `NetworkBehaviour` that represents sending message 2 to Alice. @@ -42,7 +39,7 @@ pub struct Behaviour { impl Behaviour { pub fn send(&mut self, alice: PeerId, msg: Message2) { - let msg = BobToAlice::Message2(msg); + let msg = BobToAlice::Message2(Box::new(msg)); let _id = self.rr.send_request(&alice, msg); } @@ -87,9 +84,9 @@ impl NetworkBehaviourEventProcess> message: RequestResponseMessage::Response { response, .. }, .. } => { - if let AliceToBob::Message2(msg) = response { - debug!("Received Message2"); - self.events.push_back(OutEvent::Msg(msg)); + if let AliceToBob::Message2 = response { + debug!("Received Message 2 acknowledgement"); + self.events.push_back(OutEvent::Msg); } } RequestResponseEvent::InboundFailure { error, .. } => { diff --git a/swap/src/protocol/bob/state.rs b/swap/src/protocol/bob/state.rs index f06cfd17..74f09ca2 100644 --- a/swap/src/protocol/bob/state.rs +++ b/swap/src/protocol/bob/state.rs @@ -9,14 +9,10 @@ use crate::{ config::Config, monero, monero::{monero_private_key, TransferProof}, - protocol::{alice, bob, SwapAmounts}, + protocol::{alice, bob, bob::EncryptedSignature, SwapAmounts}, }; use anyhow::{anyhow, Result}; -use ecdsa_fun::{ - adaptor::{Adaptor, EncryptedSignature}, - nonce::Deterministic, - Signature, -}; +use ecdsa_fun::{adaptor::Adaptor, nonce::Deterministic, Signature}; use monero_harness::rpc::wallet::BlockHeight; use rand::{CryptoRng, RngCore}; use serde::{Deserialize, Serialize}; @@ -244,7 +240,7 @@ pub struct State2 { pub punish_address: bitcoin::Address, pub tx_lock: bitcoin::TxLock, pub tx_cancel_sig_a: Signature, - pub tx_refund_encsig: EncryptedSignature, + pub tx_refund_encsig: bitcoin::EncryptedSignature, pub min_monero_confirmations: u32, } @@ -313,7 +309,7 @@ pub struct State3 { punish_address: bitcoin::Address, pub tx_lock: bitcoin::TxLock, pub tx_cancel_sig_a: Signature, - pub tx_refund_encsig: EncryptedSignature, + pub tx_refund_encsig: bitcoin::EncryptedSignature, pub min_monero_confirmations: u32, } @@ -433,19 +429,19 @@ pub struct State4 { punish_address: bitcoin::Address, pub tx_lock: bitcoin::TxLock, pub tx_cancel_sig_a: Signature, - pub tx_refund_encsig: EncryptedSignature, + pub tx_refund_encsig: bitcoin::EncryptedSignature, pub monero_wallet_restore_blockheight: u32, } impl State4 { - pub fn next_message(&self) -> bob::Message3 { + pub fn next_message(&self) -> EncryptedSignature { let tx_redeem = bitcoin::TxRedeem::new(&self.tx_lock, &self.redeem_address); let tx_redeem_encsig = self.b.encsign(self.S_a_bitcoin, tx_redeem.digest()); - bob::Message3 { tx_redeem_encsig } + EncryptedSignature { tx_redeem_encsig } } - pub fn tx_redeem_encsig(&self) -> EncryptedSignature { + pub fn tx_redeem_encsig(&self) -> bitcoin::EncryptedSignature { let tx_redeem = bitcoin::TxRedeem::new(&self.tx_lock, &self.redeem_address); self.b.encsign(self.S_a_bitcoin, tx_redeem.digest()) } @@ -615,7 +611,7 @@ pub struct State5 { pub redeem_address: bitcoin::Address, punish_address: bitcoin::Address, pub tx_lock: bitcoin::TxLock, - tx_refund_encsig: EncryptedSignature, + tx_refund_encsig: bitcoin::EncryptedSignature, tx_cancel_sig: Signature, pub monero_wallet_restore_blockheight: u32, } diff --git a/swap/src/protocol/bob/swap.rs b/swap/src/protocol/bob/swap.rs index 049ac52f..767372be 100644 --- a/swap/src/protocol/bob/swap.rs +++ b/swap/src/protocol/bob/swap.rs @@ -130,7 +130,7 @@ where { event_loop_handle.dial().await?; - let msg2_watcher = event_loop_handle.recv_message2(); + let transfer_proof_watcher = event_loop_handle.recv_transfer_proof(); let cancel_timelock_expires = state3.wait_for_cancel_timelock_to_expire(bitcoin_wallet.as_ref()); @@ -143,13 +143,11 @@ where monero_wallet.inner.block_height().await?; select! { - msg2 = msg2_watcher => { - - let msg2 = msg2?; - + transfer_proof = transfer_proof_watcher => { + let transfer_proof = transfer_proof?; BobState::XmrLockProofReceived { state: state3, - lock_transfer_proof: msg2.tx_lock_proof, + lock_transfer_proof: transfer_proof.tx_lock_proof, monero_wallet_restore_blockheight } }, @@ -235,7 +233,8 @@ where let state4_clone = state.clone(); - let enc_sig_sent_watcher = event_loop_handle.send_message3(tx_redeem_encsig); + let enc_sig_sent_watcher = + event_loop_handle.send_encrypted_signature(tx_redeem_encsig); let bitcoin_wallet = bitcoin_wallet.clone(); let cancel_timelock_expires = state4_clone.wait_for_cancel_timelock_to_expire(bitcoin_wallet.as_ref()); diff --git a/swap/src/protocol/bob/swap_request.rs b/swap/src/protocol/bob/swap_request.rs index 121eb6de..b6a16c97 100644 --- a/swap/src/protocol/bob/swap_request.rs +++ b/swap/src/protocol/bob/swap_request.rs @@ -42,7 +42,7 @@ pub struct Behaviour { impl Behaviour { pub fn send(&mut self, alice: PeerId, swap_request: SwapRequest) -> Result { - let msg = BobToAlice::SwapRequest(swap_request); + let msg = BobToAlice::SwapRequest(Box::new(swap_request)); let id = self.rr.send_request(&alice, msg); Ok(id) @@ -71,7 +71,7 @@ impl Default for Behaviour { Self { rr: RequestResponse::new( Codec::default(), - vec![(Swap, ProtocolSupport::Full)], + vec![(Swap, ProtocolSupport::Outbound)], config, ), events: Default::default(), @@ -92,7 +92,9 @@ impl NetworkBehaviourEventProcess> } => { if let AliceToBob::SwapResponse(swap_response) = response { debug!("Received swap response"); - self.events.push_back(OutEvent { swap_response }); + self.events.push_back(OutEvent { + swap_response: *swap_response, + }); } } RequestResponseEvent::InboundFailure { error, .. } => { diff --git a/swap/src/protocol/bob/transfer_proof.rs b/swap/src/protocol/bob/transfer_proof.rs new file mode 100644 index 00000000..7f9d9565 --- /dev/null +++ b/swap/src/protocol/bob/transfer_proof.rs @@ -0,0 +1,97 @@ +use crate::{ + network::request_response::{OneShotCodec, Request, Response, TransferProofProtocol, TIMEOUT}, + protocol::alice::TransferProof, +}; +use libp2p::{ + request_response::{ + handler::RequestProtocol, ProtocolSupport, RequestResponse, RequestResponseConfig, + RequestResponseEvent, RequestResponseMessage, + }, + swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}, + NetworkBehaviour, +}; +use std::{ + collections::VecDeque, + task::{Context, Poll}, + time::Duration, +}; +use tracing::{debug, error}; + +#[derive(Debug)] +pub enum OutEvent { + Msg(TransferProof), +} + +/// A `NetworkBehaviour` that represents receiving the transfer proof from +/// Alice. +#[derive(NetworkBehaviour)] +#[behaviour(out_event = "OutEvent", poll_method = "poll")] +#[allow(missing_debug_implementations)] +pub struct Behaviour { + rr: RequestResponse>, + #[behaviour(ignore)] + events: VecDeque, +} + +impl Behaviour { + fn poll( + &mut self, + _: &mut Context<'_>, + _: &mut impl PollParameters, + ) -> Poll>, OutEvent>> + { + if let Some(event) = self.events.pop_front() { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + } + + Poll::Pending + } +} + +impl Default for Behaviour { + fn default() -> Self { + let timeout = Duration::from_secs(TIMEOUT); + let mut config = RequestResponseConfig::default(); + config.set_request_timeout(timeout); + + Self { + rr: RequestResponse::new( + OneShotCodec::default(), + vec![(TransferProofProtocol, ProtocolSupport::Inbound)], + config, + ), + events: Default::default(), + } + } +} + +impl NetworkBehaviourEventProcess> for Behaviour { + fn inject_event(&mut self, event: RequestResponseEvent) { + match event { + RequestResponseEvent::Message { + message: + RequestResponseMessage::Request { + request, channel, .. + }, + .. + } => { + if let Request::TransferProof(msg) = request { + debug!("Received Transfer Proof"); + self.events.push_back(OutEvent::Msg(*msg)); + // Send back empty response so that the request/response protocol completes. + let _ = self.rr.send_response(channel, Response::TransferProof); + } + } + RequestResponseEvent::Message { + message: RequestResponseMessage::Response { .. }, + .. + } => panic!("Bob should not get a Response"), + RequestResponseEvent::InboundFailure { error, .. } => { + error!("Inbound failure: {:?}", error); + } + RequestResponseEvent::OutboundFailure { error, .. } => { + error!("Outbound failure: {:?}", error); + } + } + } +} diff --git a/swap/src/serde_peer_id.rs b/swap/src/serde_peer_id.rs new file mode 100644 index 00000000..b0da3ac2 --- /dev/null +++ b/swap/src/serde_peer_id.rs @@ -0,0 +1,48 @@ +//! A serde module that defines how we want to serialize PeerIds on the +//! HTTP-API. + +use libp2p::PeerId; +use serde::{de::Error, Deserialize, Deserializer, Serializer}; + +pub fn serialize(peer_id: &PeerId, serializer: S) -> Result +where + S: Serializer, +{ + let string = peer_id.to_string(); + serializer.serialize_str(&string) +} + +#[allow(dead_code)] +pub fn deserialize<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + let string = String::deserialize(deserializer)?; + let peer_id = string.parse().map_err(D::Error::custom)?; + + Ok(peer_id) +} + +#[cfg(test)] +mod tests { + use super::*; + use serde::Serialize; + use spectral::prelude::*; + + #[derive(Serialize)] + struct SerializablePeerId(#[serde(with = "super")] PeerId); + + #[test] + fn maker_id_serializes_as_expected() { + let peer_id = SerializablePeerId( + "QmfUfpC2frwFvcDzpspnfZitHt5wct6n4kpG5jzgRdsxkY" + .parse() + .unwrap(), + ); + + let got = serde_json::to_string(&peer_id).expect("failed to serialize peer id"); + + assert_that(&got) + .is_equal_to(r#""QmfUfpC2frwFvcDzpspnfZitHt5wct6n4kpG5jzgRdsxkY""#.to_string()); + } +}