From 73f30320a6ba03e2d9b908211bee8368053fe29e Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 23 Mar 2021 16:53:25 +1100 Subject: [PATCH 01/11] Seed should neither be Clone nor Copy It is better to not copy around secret data within our process to make heartbleed-like attacks harder. --- swap/src/bin/swap.rs | 10 +++++----- swap/src/seed.rs | 2 +- swap/tests/testutils/mod.rs | 1 - 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/swap/src/bin/swap.rs b/swap/src/bin/swap.rs index 7f05fa12..2455753c 100644 --- a/swap/src/bin/swap.rs +++ b/swap/src/bin/swap.rs @@ -101,7 +101,7 @@ async fn main() -> Result<()> { } let bitcoin_wallet = - init_bitcoin_wallet(electrum_rpc_url, seed, data_dir.clone(), env_config).await?; + init_bitcoin_wallet(electrum_rpc_url, &seed, data_dir.clone(), env_config).await?; let (monero_wallet, _process) = init_monero_wallet(data_dir, monero_daemon_host, env_config).await?; let bitcoin_wallet = Arc::new(bitcoin_wallet); @@ -183,7 +183,7 @@ async fn main() -> Result<()> { } let bitcoin_wallet = - init_bitcoin_wallet(electrum_rpc_url, seed, data_dir.clone(), env_config).await?; + init_bitcoin_wallet(electrum_rpc_url, &seed, data_dir.clone(), env_config).await?; let (monero_wallet, _process) = init_monero_wallet(data_dir, monero_daemon_host, env_config).await?; let bitcoin_wallet = Arc::new(bitcoin_wallet); @@ -223,7 +223,7 @@ async fn main() -> Result<()> { electrum_rpc_url, } => { let bitcoin_wallet = - init_bitcoin_wallet(electrum_rpc_url, seed, data_dir, env_config).await?; + init_bitcoin_wallet(electrum_rpc_url, &seed, data_dir, env_config).await?; let resume_state = db.get_state(swap_id)?.try_into_bob()?.into(); let cancel = @@ -248,7 +248,7 @@ async fn main() -> Result<()> { electrum_rpc_url, } => { let bitcoin_wallet = - init_bitcoin_wallet(electrum_rpc_url, seed, data_dir, env_config).await?; + init_bitcoin_wallet(electrum_rpc_url, &seed, data_dir, env_config).await?; let resume_state = db.get_state(swap_id)?.try_into_bob()?.into(); @@ -260,7 +260,7 @@ async fn main() -> Result<()> { async fn init_bitcoin_wallet( electrum_rpc_url: Url, - seed: Seed, + seed: &Seed, data_dir: PathBuf, env_config: Config, ) -> Result { diff --git a/swap/src/seed.rs b/swap/src/seed.rs index 7432adc5..1536959d 100644 --- a/swap/src/seed.rs +++ b/swap/src/seed.rs @@ -15,7 +15,7 @@ use std::path::{Path, PathBuf}; pub const SEED_LENGTH: usize = 32; -#[derive(Clone, Copy, Eq, PartialEq)] +#[derive(Eq, PartialEq)] pub struct Seed([u8; SEED_LENGTH]); impl Seed { diff --git a/swap/tests/testutils/mod.rs b/swap/tests/testutils/mod.rs index d9070d8e..d22fc2e9 100644 --- a/swap/tests/testutils/mod.rs +++ b/swap/tests/testutils/mod.rs @@ -43,7 +43,6 @@ pub struct StartingBalances { pub btc: bitcoin::Amount, } -#[derive(Clone)] struct BobParams { seed: Seed, db_path: PathBuf, From 9d0b9abde06a67e623520974101b6ad42f874c31 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 18 Mar 2021 16:59:51 +1100 Subject: [PATCH 02/11] Introduce helper function for mapping RequestResponseEvent Decomposing a RequestResponseEvent is quite verbose. We can introduce a helper function that does the matching for us and delegates to specific `From` implementations for the protocol specific bits. --- swap/src/network/quote.rs | 3 + swap/src/network/spot_price.rs | 3 + swap/src/protocol/alice/behaviour.rs | 108 ++++++++++++++------------ swap/src/protocol/alice/event_loop.rs | 4 +- swap/src/protocol/bob.rs | 104 ++++++++++++------------- 5 files changed, 114 insertions(+), 108 deletions(-) diff --git a/swap/src/network/quote.rs b/swap/src/network/quote.rs index 4e536a0b..ee938a8a 100644 --- a/swap/src/network/quote.rs +++ b/swap/src/network/quote.rs @@ -3,6 +3,7 @@ use crate::network::request_response::CborCodec; use libp2p::core::ProtocolName; use libp2p::request_response::{ ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent, + RequestResponseMessage, }; use serde::{Deserialize, Serialize}; @@ -17,6 +18,8 @@ impl ProtocolName for BidQuoteProtocol { } } +pub type Message = RequestResponseMessage<(), BidQuote>; + /// Represents a quote for buying XMR. #[derive(Serialize, Deserialize, Debug, Clone)] pub struct BidQuote { diff --git a/swap/src/network/spot_price.rs b/swap/src/network/spot_price.rs index 3fcd9373..1b5aa617 100644 --- a/swap/src/network/spot_price.rs +++ b/swap/src/network/spot_price.rs @@ -3,6 +3,7 @@ use crate::{bitcoin, monero}; use libp2p::core::ProtocolName; use libp2p::request_response::{ ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent, + RequestResponseMessage, }; use serde::{Deserialize, Serialize}; @@ -39,6 +40,8 @@ pub struct Response { pub type Behaviour = RequestResponse>; +pub type Message = RequestResponseMessage; + /// Constructs a new instance of the `spot-price` behaviour to be used by Alice. /// /// Alice only supports inbound connections, i.e. providing spot prices for BTC diff --git a/swap/src/protocol/alice/behaviour.rs b/swap/src/protocol/alice/behaviour.rs index a7825678..63aca002 100644 --- a/swap/src/protocol/alice/behaviour.rs +++ b/swap/src/protocol/alice/behaviour.rs @@ -7,7 +7,7 @@ use crate::protocol::alice::{ use crate::protocol::bob::EncryptedSignature; use crate::{bitcoin, monero}; use anyhow::{anyhow, Error, Result}; -use libp2p::request_response::{RequestResponseMessage, ResponseChannel}; +use libp2p::request_response::{RequestResponseEvent, RequestResponseMessage, ResponseChannel}; use libp2p::{NetworkBehaviour, PeerId}; use rand::{CryptoRng, RngCore}; use tracing::debug; @@ -16,7 +16,7 @@ use tracing::debug; pub enum OutEvent { ConnectionEstablished(PeerId), SpotPriceRequested { - msg: spot_price::Request, + request: spot_price::Request, channel: ResponseChannel, peer: PeerId, }, @@ -51,62 +51,68 @@ impl From for OutEvent { } } +impl OutEvent { + fn unexpected_response(peer: PeerId) -> OutEvent { + OutEvent::Failure { + peer, + error: anyhow!("Unexpected response received"), + } + } +} + +impl From<(PeerId, quote::Message)> for OutEvent { + fn from((peer, message): (PeerId, quote::Message)) -> Self { + match message { + quote::Message::Request { channel, .. } => OutEvent::QuoteRequested { channel, peer }, + quote::Message::Response { .. } => OutEvent::unexpected_response(peer), + } + } +} + +impl From<(PeerId, spot_price::Message)> for OutEvent { + fn from((peer, message): (PeerId, spot_price::Message)) -> Self { + match message { + spot_price::Message::Request { + request, channel, .. + } => OutEvent::SpotPriceRequested { + request, + channel, + peer, + }, + spot_price::Message::Response { .. } => OutEvent::unexpected_response(peer), + } + } +} + impl From for OutEvent { fn from(event: spot_price::OutEvent) -> Self { - match event { - spot_price::OutEvent::Message { - peer, - message: - RequestResponseMessage::Request { - channel, - request: msg, - .. - }, - } => OutEvent::SpotPriceRequested { msg, channel, peer }, - spot_price::OutEvent::Message { - message: RequestResponseMessage::Response { .. }, - peer, - } => OutEvent::Failure { - error: anyhow!("Alice is only meant to hand out spot prices, not receive them"), - peer, - }, - spot_price::OutEvent::ResponseSent { .. } => OutEvent::ResponseSent, - spot_price::OutEvent::InboundFailure { peer, error, .. } => OutEvent::Failure { - error: anyhow!("spot_price protocol failed due to {:?}", error), - peer, - }, - spot_price::OutEvent::OutboundFailure { peer, error, .. } => OutEvent::Failure { - error: anyhow!("spot_price protocol failed due to {:?}", error), - peer, - }, - } + map_rr_event_to_outevent(event) } } impl From for OutEvent { fn from(event: quote::OutEvent) -> Self { - match event { - quote::OutEvent::Message { - peer, - message: RequestResponseMessage::Request { channel, .. }, - } => OutEvent::QuoteRequested { channel, peer }, - quote::OutEvent::Message { - message: RequestResponseMessage::Response { .. }, - peer, - } => OutEvent::Failure { - error: anyhow!("Alice is only meant to hand out quotes, not receive them"), - peer, - }, - quote::OutEvent::ResponseSent { .. } => OutEvent::ResponseSent, - quote::OutEvent::InboundFailure { peer, error, .. } => OutEvent::Failure { - error: anyhow!("quote protocol failed due to {:?}", error), - peer, - }, - quote::OutEvent::OutboundFailure { peer, error, .. } => OutEvent::Failure { - error: anyhow!("quote protocol failed due to {:?}", error), - peer, - }, - } + map_rr_event_to_outevent(event) + } +} + +fn map_rr_event_to_outevent(event: RequestResponseEvent) -> OutEvent +where + OutEvent: From<(PeerId, RequestResponseMessage)>, +{ + use RequestResponseEvent::*; + + match event { + Message { message, peer, .. } => OutEvent::from((peer, message)), + ResponseSent { .. } => OutEvent::ResponseSent, + InboundFailure { peer, error, .. } => OutEvent::Failure { + error: anyhow!("protocol failed due to {:?}", error), + peer, + }, + OutboundFailure { peer, error, .. } => OutEvent::Failure { + error: anyhow!("protocol failed due to {:?}", error), + peer, + }, } } diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index c04dbdb8..c5f993dc 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -105,8 +105,8 @@ where OutEvent::ConnectionEstablished(alice) => { debug!("Connection Established with {}", alice); } - OutEvent::SpotPriceRequested { msg, channel, peer } => { - let btc = msg.btc; + OutEvent::SpotPriceRequested { request, channel, peer } => { + let btc = request.btc; let xmr = match self.handle_spot_price_request(btc, self.monero_wallet.clone()).await { Ok(xmr) => xmr, Err(e) => { diff --git a/swap/src/protocol/bob.rs b/swap/src/protocol/bob.rs index 62103aea..363d4573 100644 --- a/swap/src/protocol/bob.rs +++ b/swap/src/protocol/bob.rs @@ -7,7 +7,7 @@ use crate::{bitcoin, monero}; use anyhow::{anyhow, Error, Result}; pub use execution_setup::{Message0, Message2, Message4}; use libp2p::core::Multiaddr; -use libp2p::request_response::{RequestResponseMessage, ResponseChannel}; +use libp2p::request_response::{RequestResponseEvent, RequestResponseMessage, ResponseChannel}; use libp2p::{NetworkBehaviour, PeerId}; use std::sync::Arc; use tracing::debug; @@ -126,6 +126,30 @@ pub enum OutEvent { CommunicationError(Error), } +impl OutEvent { + fn unexpected_request() -> OutEvent { + OutEvent::CommunicationError(anyhow!("Unexpected request received")) + } +} + +impl From for OutEvent { + fn from(message: quote::Message) -> Self { + match message { + quote::Message::Request { .. } => OutEvent::unexpected_request(), + quote::Message::Response { response, .. } => OutEvent::QuoteReceived(response), + } + } +} + +impl From for OutEvent { + fn from(message: spot_price::Message) -> Self { + match message { + spot_price::Message::Request { .. } => OutEvent::unexpected_request(), + spot_price::Message::Response { response, .. } => OutEvent::SpotPriceReceived(response), + } + } +} + impl From for OutEvent { fn from(event: peer_tracker::OutEvent) -> Self { match event { @@ -138,65 +162,35 @@ impl From for OutEvent { impl From for OutEvent { fn from(event: spot_price::OutEvent) -> Self { - match event { - spot_price::OutEvent::Message { - message: RequestResponseMessage::Response { response, .. }, - .. - } => OutEvent::SpotPriceReceived(response), - spot_price::OutEvent::Message { - message: RequestResponseMessage::Request { .. }, - .. - } => OutEvent::CommunicationError(anyhow!( - "Bob is only meant to receive spot prices, not hand them out" - )), - spot_price::OutEvent::ResponseSent { .. } => OutEvent::ResponseSent, - spot_price::OutEvent::InboundFailure { peer, error, .. } => { - OutEvent::CommunicationError(anyhow!( - "spot_price protocol with peer {} failed due to {:?}", - peer, - error - )) - } - spot_price::OutEvent::OutboundFailure { peer, error, .. } => { - OutEvent::CommunicationError(anyhow!( - "spot_price protocol with peer {} failed due to {:?}", - peer, - error - )) - } - } + map_rr_event_to_outevent(event) } } impl From for OutEvent { fn from(event: quote::OutEvent) -> Self { - match event { - quote::OutEvent::Message { - message: RequestResponseMessage::Response { response, .. }, - .. - } => OutEvent::QuoteReceived(response), - quote::OutEvent::Message { - message: RequestResponseMessage::Request { .. }, - .. - } => OutEvent::CommunicationError(anyhow!( - "Bob is only meant to receive quotes, not hand them out" - )), - quote::OutEvent::ResponseSent { .. } => OutEvent::ResponseSent, - quote::OutEvent::InboundFailure { peer, error, .. } => { - OutEvent::CommunicationError(anyhow!( - "quote protocol with peer {} failed due to {:?}", - peer, - error - )) - } - quote::OutEvent::OutboundFailure { peer, error, .. } => { - OutEvent::CommunicationError(anyhow!( - "quote protocol with peer {} failed due to {:?}", - peer, - error - )) - } - } + map_rr_event_to_outevent(event) + } +} + +fn map_rr_event_to_outevent(event: RequestResponseEvent) -> OutEvent +where + OutEvent: From>, +{ + use RequestResponseEvent::*; + + match event { + Message { message, .. } => OutEvent::from(message), + ResponseSent { .. } => OutEvent::ResponseSent, + InboundFailure { peer, error, .. } => OutEvent::CommunicationError(anyhow!( + "protocol with peer {} failed due to {:?}", + peer, + error + )), + OutboundFailure { peer, error, .. } => OutEvent::CommunicationError(anyhow!( + "protocol with peer {} failed due to {:?}", + peer, + error + )), } } From 9979cc9f1f869ed98385a7b463825c8f9b88aa78 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 18 Mar 2021 17:13:19 +1100 Subject: [PATCH 03/11] Unify transfer-proof protocol to a single one Previously, we had two implementations of this protocol. To reduce code size, we make Alice and Bob use the same implementation. --- swap/src/network.rs | 1 + swap/src/network/request_response.rs | 9 --- swap/src/network/transfer_proof.rs | 44 ++++++++++++ swap/src/protocol/alice.rs | 2 - swap/src/protocol/alice/behaviour.rs | 47 +++++++------ swap/src/protocol/alice/event_loop.rs | 11 +-- swap/src/protocol/alice/transfer_proof.rs | 82 ---------------------- swap/src/protocol/bob.rs | 50 +++++++------ swap/src/protocol/bob/event_loop.rs | 13 ++-- swap/src/protocol/bob/transfer_proof.rs | 85 ----------------------- 10 files changed, 112 insertions(+), 232 deletions(-) create mode 100644 swap/src/network/transfer_proof.rs delete mode 100644 swap/src/protocol/alice/transfer_proof.rs delete mode 100644 swap/src/protocol/bob/transfer_proof.rs diff --git a/swap/src/network.rs b/swap/src/network.rs index 7a82065b..a3e2b91b 100644 --- a/swap/src/network.rs +++ b/swap/src/network.rs @@ -2,6 +2,7 @@ pub mod peer_tracker; pub mod quote; pub mod request_response; pub mod spot_price; +pub mod transfer_proof; pub mod transport; use libp2p::core::Executor; diff --git a/swap/src/network/request_response.rs b/swap/src/network/request_response.rs index 5e39e347..3c4694e9 100644 --- a/swap/src/network/request_response.rs +++ b/swap/src/network/request_response.rs @@ -15,18 +15,9 @@ pub const TIMEOUT: u64 = 3600; // One hour. /// Message receive buffer. pub const BUF_SIZE: usize = 1024 * 1024; -#[derive(Debug, Clone, Copy, Default)] -pub struct TransferProofProtocol; - #[derive(Debug, Clone, Copy, Default)] pub struct EncryptedSignatureProtocol; -impl ProtocolName for TransferProofProtocol { - fn protocol_name(&self) -> &[u8] { - b"/comit/xmr/btc/transfer_proof/1.0.0" - } -} - impl ProtocolName for EncryptedSignatureProtocol { fn protocol_name(&self) -> &[u8] { b"/comit/xmr/btc/encrypted_signature/1.0.0" diff --git a/swap/src/network/transfer_proof.rs b/swap/src/network/transfer_proof.rs new file mode 100644 index 00000000..bcca42f5 --- /dev/null +++ b/swap/src/network/transfer_proof.rs @@ -0,0 +1,44 @@ +use crate::monero; +use crate::network::request_response::CborCodec; +use libp2p::core::ProtocolName; +use libp2p::request_response::{ + ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent, + RequestResponseMessage, +}; +use serde::{Deserialize, Serialize}; + +pub type OutEvent = RequestResponseEvent; + +#[derive(Debug, Clone, Copy, Default)] +pub struct TransferProofProtocol; + +impl ProtocolName for TransferProofProtocol { + fn protocol_name(&self) -> &[u8] { + b"/comit/xmr/btc/transfer_proof/1.0.0" + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Request { + pub tx_lock_proof: monero::TransferProof, +} + +pub type Behaviour = RequestResponse>; + +pub type Message = RequestResponseMessage; + +pub fn alice() -> Behaviour { + Behaviour::new( + CborCodec::default(), + vec![(TransferProofProtocol, ProtocolSupport::Outbound)], + RequestResponseConfig::default(), + ) +} + +pub fn bob() -> Behaviour { + Behaviour::new( + CborCodec::default(), + vec![(TransferProofProtocol, ProtocolSupport::Inbound)], + RequestResponseConfig::default(), + ) +} diff --git a/swap/src/protocol/alice.rs b/swap/src/protocol/alice.rs index 28ef4bce..ad3f906e 100644 --- a/swap/src/protocol/alice.rs +++ b/swap/src/protocol/alice.rs @@ -11,7 +11,6 @@ pub use self::event_loop::{EventLoop, EventLoopHandle}; pub use self::execution_setup::Message1; pub use self::state::*; pub use self::swap::{run, run_until}; -pub use self::transfer_proof::TransferProof; pub use execution_setup::Message3; mod behaviour; @@ -20,7 +19,6 @@ pub mod event_loop; mod execution_setup; pub mod state; pub mod swap; -mod transfer_proof; pub struct Swap { pub state: AliceState, diff --git a/swap/src/protocol/alice/behaviour.rs b/swap/src/protocol/alice/behaviour.rs index 63aca002..9e876ede 100644 --- a/swap/src/protocol/alice/behaviour.rs +++ b/swap/src/protocol/alice/behaviour.rs @@ -1,9 +1,7 @@ use crate::env::Config; use crate::network::quote::BidQuote; -use crate::network::{peer_tracker, quote, spot_price}; -use crate::protocol::alice::{ - encrypted_signature, execution_setup, transfer_proof, State0, State3, TransferProof, -}; +use crate::network::{peer_tracker, quote, spot_price, transfer_proof}; +use crate::protocol::alice::{encrypted_signature, execution_setup, State0, State3}; use crate::protocol::bob::EncryptedSignature; use crate::{bitcoin, monero}; use anyhow::{anyhow, Error, Result}; @@ -52,6 +50,13 @@ impl From for OutEvent { } impl OutEvent { + fn unexpected_request(peer: PeerId) -> OutEvent { + OutEvent::Failure { + peer, + error: anyhow!("Unexpected request received"), + } + } + fn unexpected_response(peer: PeerId) -> OutEvent { OutEvent::Failure { peer, @@ -84,6 +89,15 @@ impl From<(PeerId, spot_price::Message)> for OutEvent { } } +impl From<(PeerId, transfer_proof::Message)> for OutEvent { + fn from((peer, message): (PeerId, transfer_proof::Message)) -> Self { + match message { + transfer_proof::Message::Request { .. } => OutEvent::unexpected_request(peer), + transfer_proof::Message::Response { .. } => OutEvent::TransferProofAcknowledged(peer), + } + } +} + impl From for OutEvent { fn from(event: spot_price::OutEvent) -> Self { map_rr_event_to_outevent(event) @@ -96,6 +110,12 @@ impl From for OutEvent { } } +impl From for OutEvent { + fn from(event: transfer_proof::OutEvent) -> Self { + map_rr_event_to_outevent(event) + } +} + fn map_rr_event_to_outevent(event: RequestResponseEvent) -> OutEvent where OutEvent: From<(PeerId, RequestResponseMessage)>, @@ -132,19 +152,6 @@ impl From for OutEvent { } } -impl From for OutEvent { - fn from(event: transfer_proof::OutEvent) -> Self { - use crate::protocol::alice::transfer_proof::OutEvent::*; - match event { - Acknowledged(peer) => OutEvent::TransferProofAcknowledged(peer), - Failure { peer, error } => OutEvent::Failure { - peer, - error: error.context("Failure with Transfer Proof"), - }, - } - } -} - impl From for OutEvent { fn from(event: encrypted_signature::OutEvent) -> Self { use crate::protocol::alice::encrypted_signature::OutEvent::*; @@ -183,7 +190,7 @@ impl Default for Behaviour { quote: quote::alice(), spot_price: spot_price::alice(), execution_setup: Default::default(), - transfer_proof: Default::default(), + transfer_proof: transfer_proof::alice(), encrypted_signature: Default::default(), } } @@ -237,8 +244,8 @@ impl Behaviour { } /// Send Transfer Proof to Bob. - pub fn send_transfer_proof(&mut self, bob: PeerId, msg: TransferProof) { - self.transfer_proof.send(bob, msg); + pub fn send_transfer_proof(&mut self, bob: PeerId, msg: transfer_proof::Request) { + self.transfer_proof.send_request(&bob, msg); debug!("Sent Transfer Proof"); } diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index c5f993dc..1aaa9eec 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -3,8 +3,8 @@ use crate::database::Database; use crate::env::Config; use crate::monero::BalanceTooLow; use crate::network::quote::BidQuote; -use crate::network::{spot_price, transport, TokioExecutor}; -use crate::protocol::alice::{AliceState, Behaviour, OutEvent, State3, Swap, TransferProof}; +use crate::network::{spot_price, transfer_proof, transport, TokioExecutor}; +use crate::protocol::alice::{AliceState, Behaviour, OutEvent, State3, Swap}; use crate::protocol::bob::EncryptedSignature; use crate::seed::Seed; use crate::{bitcoin, kraken, monero}; @@ -37,7 +37,8 @@ pub struct EventLoop { recv_encrypted_signature: HashMap>, /// Stores a list of futures, waiting for transfer proof which will be sent /// to the given peer. - send_transfer_proof: FuturesUnordered>>, + send_transfer_proof: + FuturesUnordered>>, swap_sender: mpsc::Sender, } @@ -307,7 +308,7 @@ impl LatestRate for kraken::RateUpdateStream { #[derive(Debug)] pub struct EventLoopHandle { recv_encrypted_signature: Option>, - send_transfer_proof: Option>, + send_transfer_proof: Option>, } impl EventLoopHandle { @@ -327,7 +328,7 @@ impl EventLoopHandle { .send_transfer_proof .take() .context("Transfer proof was already sent")? - .send(TransferProof { tx_lock_proof: msg }) + .send(transfer_proof::Request { tx_lock_proof: msg }) .is_err() { bail!("Failed to send transfer proof, receiver no longer listening?") diff --git a/swap/src/protocol/alice/transfer_proof.rs b/swap/src/protocol/alice/transfer_proof.rs deleted file mode 100644 index 2d03add8..00000000 --- a/swap/src/protocol/alice/transfer_proof.rs +++ /dev/null @@ -1,82 +0,0 @@ -use crate::monero; -use crate::network::request_response::{CborCodec, TransferProofProtocol, TIMEOUT}; -use anyhow::{anyhow, Error}; -use libp2p::request_response::{ - ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent, - RequestResponseMessage, -}; -use libp2p::{NetworkBehaviour, PeerId}; -use serde::{Deserialize, Serialize}; -use std::time::Duration; - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct TransferProof { - pub tx_lock_proof: monero::TransferProof, -} - -#[derive(Debug)] -pub enum OutEvent { - Acknowledged(PeerId), - Failure { peer: PeerId, error: Error }, -} - -/// A `NetworkBehaviour` that represents sending the Monero transfer proof to -/// Bob. -#[derive(NetworkBehaviour)] -#[behaviour(out_event = "OutEvent", event_process = false)] -#[allow(missing_debug_implementations)] -pub struct Behaviour { - rr: RequestResponse>, -} - -impl Behaviour { - pub fn send(&mut self, bob: PeerId, msg: TransferProof) { - let _id = self.rr.send_request(&bob, msg); - } -} - -impl Default for Behaviour { - fn default() -> Self { - let timeout = Duration::from_secs(TIMEOUT); - let mut config = RequestResponseConfig::default(); - config.set_request_timeout(timeout); - - Self { - rr: RequestResponse::new( - CborCodec::default(), - vec![(TransferProofProtocol, ProtocolSupport::Outbound)], - config, - ), - } - } -} - -impl From> for OutEvent { - fn from(event: RequestResponseEvent) -> Self { - match event { - RequestResponseEvent::Message { - message: RequestResponseMessage::Request { .. }, - peer, - } => OutEvent::Failure { - peer, - error: anyhow!("Alice should never get a transfer proof request from Bob"), - }, - RequestResponseEvent::Message { - message: RequestResponseMessage::Response { .. }, - peer, - } => OutEvent::Acknowledged(peer), - RequestResponseEvent::InboundFailure { error, peer, .. } => OutEvent::Failure { - peer, - error: anyhow!("Inbound failure: {:?}", error), - }, - RequestResponseEvent::OutboundFailure { error, peer, .. } => OutEvent::Failure { - peer, - error: anyhow!("Outbound failure: {:?}", error), - }, - RequestResponseEvent::ResponseSent { peer, .. } => OutEvent::Failure { - peer, - error: anyhow!("Alice should not send a response"), - }, - } - } -} diff --git a/swap/src/protocol/bob.rs b/swap/src/protocol/bob.rs index 363d4573..cb138013 100644 --- a/swap/src/protocol/bob.rs +++ b/swap/src/protocol/bob.rs @@ -1,7 +1,6 @@ use crate::database::Database; use crate::env::Config; use crate::network::{peer_tracker, spot_price}; -use crate::protocol::alice::TransferProof; use crate::protocol::bob; use crate::{bitcoin, monero}; use anyhow::{anyhow, Error, Result}; @@ -19,8 +18,8 @@ pub use self::event_loop::{EventLoop, EventLoopHandle}; pub use self::refund::refund; pub use self::state::*; pub use self::swap::{run, run_until}; -use crate::network::quote; use crate::network::quote::BidQuote; +use crate::network::{quote, transfer_proof}; pub mod cancel; mod encrypted_signature; @@ -29,7 +28,6 @@ mod execution_setup; pub mod refund; pub mod state; pub mod swap; -mod transfer_proof; pub struct Swap { pub state: BobState, @@ -117,8 +115,8 @@ pub enum OutEvent { QuoteReceived(BidQuote), SpotPriceReceived(spot_price::Response), ExecutionSetupDone(Result>), - TransferProof { - msg: Box, + TransferProofReceived { + msg: Box, channel: ResponseChannel<()>, }, EncryptedSignatureAcknowledged, @@ -130,6 +128,10 @@ impl OutEvent { fn unexpected_request() -> OutEvent { OutEvent::CommunicationError(anyhow!("Unexpected request received")) } + + fn unexpected_response() -> OutEvent { + OutEvent::CommunicationError(anyhow!("Unexpected response received")) + } } impl From for OutEvent { @@ -150,6 +152,20 @@ impl From for OutEvent { } } +impl From for OutEvent { + fn from(message: transfer_proof::Message) -> Self { + match message { + transfer_proof::Message::Request { + request, channel, .. + } => OutEvent::TransferProofReceived { + msg: Box::new(request), + channel, + }, + transfer_proof::Message::Response { .. } => OutEvent::unexpected_response(), + } + } +} + impl From for OutEvent { fn from(event: peer_tracker::OutEvent) -> Self { match event { @@ -172,6 +188,12 @@ impl From for OutEvent { } } +impl From for OutEvent { + fn from(event: transfer_proof::OutEvent) -> Self { + map_rr_event_to_outevent(event) + } +} + fn map_rr_event_to_outevent(event: RequestResponseEvent) -> OutEvent where OutEvent: From>, @@ -202,22 +224,6 @@ impl From for OutEvent { } } -impl From for OutEvent { - fn from(event: transfer_proof::OutEvent) -> Self { - use transfer_proof::OutEvent::*; - match event { - MsgReceived { msg, channel } => OutEvent::TransferProof { - msg: Box::new(msg), - channel, - }, - AckSent => OutEvent::ResponseSent, - Failure(err) => { - OutEvent::CommunicationError(err.context("Failure with Transfer Proof")) - } - } - } -} - impl From for OutEvent { fn from(event: encrypted_signature::OutEvent) -> Self { use encrypted_signature::OutEvent::*; @@ -250,7 +256,7 @@ impl Default for Behaviour { quote: quote::bob(), spot_price: spot_price::bob(), execution_setup: Default::default(), - transfer_proof: Default::default(), + transfer_proof: transfer_proof::bob(), encrypted_signature: Default::default(), } } diff --git a/swap/src/protocol/bob/event_loop.rs b/swap/src/protocol/bob/event_loop.rs index 1cc1b3b9..a279ddab 100644 --- a/swap/src/protocol/bob/event_loop.rs +++ b/swap/src/protocol/bob/event_loop.rs @@ -1,7 +1,6 @@ use crate::bitcoin::EncryptedSignature; use crate::network::quote::BidQuote; -use crate::network::{spot_price, transport, TokioExecutor}; -use crate::protocol::alice::TransferProof; +use crate::network::{spot_price, transfer_proof, transport, TokioExecutor}; use crate::protocol::bob::{Behaviour, OutEvent, State0, State2}; use crate::{bitcoin, monero}; use anyhow::{anyhow, bail, Context, Result}; @@ -36,7 +35,7 @@ impl Default for Channels { pub struct EventLoopHandle { start_execution_setup: Sender, done_execution_setup: Receiver>, - recv_transfer_proof: Receiver, + recv_transfer_proof: Receiver, conn_established: Receiver, dial_alice: Sender<()>, send_encrypted_signature: Sender, @@ -56,7 +55,7 @@ impl EventLoopHandle { .ok_or_else(|| anyhow!("Failed to setup execution with Alice"))? } - pub async fn recv_transfer_proof(&mut self) -> Result { + pub async fn recv_transfer_proof(&mut self) -> Result { self.recv_transfer_proof .recv() .await @@ -122,7 +121,7 @@ pub struct EventLoop { recv_spot_price: Sender, start_execution_setup: Receiver, done_execution_setup: Sender>, - recv_transfer_proof: Sender, + recv_transfer_proof: Sender, dial_alice: Receiver<()>, conn_established: Sender, send_encrypted_signature: Receiver, @@ -212,10 +211,10 @@ impl EventLoop { OutEvent::ExecutionSetupDone(res) => { let _ = self.done_execution_setup.send(res.map(|state|*state)).await; } - OutEvent::TransferProof{ msg, channel }=> { + OutEvent::TransferProofReceived{ msg, channel }=> { let _ = self.recv_transfer_proof.send(*msg).await; // Send back empty response so that the request/response protocol completes. - if let Err(error) = self.swarm.transfer_proof.send_ack(channel) { + if let Err(error) = self.swarm.transfer_proof.send_response(channel, ()) { error!("Failed to send Transfer Proof ack: {:?}", error); } } diff --git a/swap/src/protocol/bob/transfer_proof.rs b/swap/src/protocol/bob/transfer_proof.rs deleted file mode 100644 index 7b1ff1be..00000000 --- a/swap/src/protocol/bob/transfer_proof.rs +++ /dev/null @@ -1,85 +0,0 @@ -use crate::network::request_response::{CborCodec, TransferProofProtocol, TIMEOUT}; -use crate::protocol::alice::TransferProof; -use anyhow::{anyhow, Error, Result}; -use libp2p::request_response::{ - ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent, - RequestResponseMessage, ResponseChannel, -}; -use libp2p::NetworkBehaviour; -use std::time::Duration; -use tracing::debug; - -#[derive(Debug)] -pub enum OutEvent { - MsgReceived { - msg: TransferProof, - channel: ResponseChannel<()>, - }, - AckSent, - Failure(Error), -} - -/// A `NetworkBehaviour` that represents receiving the transfer proof from -/// Alice. -#[derive(NetworkBehaviour)] -#[behaviour(out_event = "OutEvent", event_process = false)] -#[allow(missing_debug_implementations)] -pub struct Behaviour { - rr: RequestResponse>, -} - -impl Behaviour { - pub fn send_ack(&mut self, channel: ResponseChannel<()>) -> Result<()> { - self.rr - .send_response(channel, ()) - .map_err(|err| anyhow!("Failed to ack transfer proof: {:?}", err)) - } -} - -impl Default for Behaviour { - fn default() -> Self { - let timeout = Duration::from_secs(TIMEOUT); - let mut config = RequestResponseConfig::default(); - config.set_request_timeout(timeout); - - Self { - rr: RequestResponse::new( - CborCodec::default(), - vec![(TransferProofProtocol, ProtocolSupport::Inbound)], - config, - ), - } - } -} - -impl From> for OutEvent { - fn from(event: RequestResponseEvent) -> Self { - match event { - RequestResponseEvent::Message { - peer, - message: - RequestResponseMessage::Request { - request, channel, .. - }, - .. - } => { - debug!("Received Transfer Proof from {}", peer); - OutEvent::MsgReceived { - msg: request, - channel, - } - } - RequestResponseEvent::Message { - message: RequestResponseMessage::Response { .. }, - .. - } => OutEvent::Failure(anyhow!("Bob should not get a Response")), - RequestResponseEvent::InboundFailure { error, .. } => { - OutEvent::Failure(anyhow!("Inbound failure: {:?}", error)) - } - RequestResponseEvent::OutboundFailure { error, .. } => { - OutEvent::Failure(anyhow!("Outbound failure: {:?}", error)) - } - RequestResponseEvent::ResponseSent { .. } => OutEvent::AckSent, - } - } -} From 1de0b39b3251ba2e11ba6ea7e463377645bfcf58 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 18 Mar 2021 17:24:06 +1100 Subject: [PATCH 04/11] Unify encrypted-signature protocol to a single one --- swap/src/network.rs | 1 + swap/src/network/encrypted_signature.rs | 43 +++++++++ swap/src/network/request_response.rs | 12 --- swap/src/protocol/alice.rs | 1 - swap/src/protocol/alice/behaviour.rs | 54 ++++++----- .../src/protocol/alice/encrypted_signature.rs | 95 ------------------- swap/src/protocol/alice/event_loop.rs | 13 +-- swap/src/protocol/bob.rs | 39 ++++---- swap/src/protocol/bob/encrypted_signature.rs | 74 --------------- swap/src/protocol/bob/state.rs | 8 +- 10 files changed, 99 insertions(+), 241 deletions(-) create mode 100644 swap/src/network/encrypted_signature.rs delete mode 100644 swap/src/protocol/alice/encrypted_signature.rs delete mode 100644 swap/src/protocol/bob/encrypted_signature.rs diff --git a/swap/src/network.rs b/swap/src/network.rs index a3e2b91b..49672cf6 100644 --- a/swap/src/network.rs +++ b/swap/src/network.rs @@ -1,3 +1,4 @@ +pub mod encrypted_signature; pub mod peer_tracker; pub mod quote; pub mod request_response; diff --git a/swap/src/network/encrypted_signature.rs b/swap/src/network/encrypted_signature.rs new file mode 100644 index 00000000..efbeda88 --- /dev/null +++ b/swap/src/network/encrypted_signature.rs @@ -0,0 +1,43 @@ +use crate::network::request_response::CborCodec; +use libp2p::core::ProtocolName; +use libp2p::request_response::{ + ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent, + RequestResponseMessage, +}; +use serde::{Deserialize, Serialize}; + +pub type OutEvent = RequestResponseEvent; + +#[derive(Debug, Clone, Copy, Default)] +pub struct EncryptedSignatureProtocol; + +impl ProtocolName for EncryptedSignatureProtocol { + fn protocol_name(&self) -> &[u8] { + b"/comit/xmr/btc/encrypted_signature/1.0.0" + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Request { + pub tx_redeem_encsig: crate::bitcoin::EncryptedSignature, +} + +pub type Behaviour = RequestResponse>; + +pub type Message = RequestResponseMessage; + +pub fn alice() -> Behaviour { + Behaviour::new( + CborCodec::default(), + vec![(EncryptedSignatureProtocol, ProtocolSupport::Inbound)], + RequestResponseConfig::default(), + ) +} + +pub fn bob() -> Behaviour { + Behaviour::new( + CborCodec::default(), + vec![(EncryptedSignatureProtocol, ProtocolSupport::Outbound)], + RequestResponseConfig::default(), + ) +} diff --git a/swap/src/network/request_response.rs b/swap/src/network/request_response.rs index 3c4694e9..055224c8 100644 --- a/swap/src/network/request_response.rs +++ b/swap/src/network/request_response.rs @@ -9,21 +9,9 @@ use std::fmt::Debug; use std::io; use std::marker::PhantomData; -/// Time to wait for a response back once we send a request. -pub const TIMEOUT: u64 = 3600; // One hour. - /// Message receive buffer. pub const BUF_SIZE: usize = 1024 * 1024; -#[derive(Debug, Clone, Copy, Default)] -pub struct EncryptedSignatureProtocol; - -impl ProtocolName for EncryptedSignatureProtocol { - fn protocol_name(&self) -> &[u8] { - b"/comit/xmr/btc/encrypted_signature/1.0.0" - } -} - #[derive(Clone, Copy, Debug)] pub struct CborCodec { phantom: PhantomData<(P, Req, Res)>, diff --git a/swap/src/protocol/alice.rs b/swap/src/protocol/alice.rs index ad3f906e..9bc6423d 100644 --- a/swap/src/protocol/alice.rs +++ b/swap/src/protocol/alice.rs @@ -14,7 +14,6 @@ pub use self::swap::{run, run_until}; pub use execution_setup::Message3; mod behaviour; -mod encrypted_signature; pub mod event_loop; mod execution_setup; pub mod state; diff --git a/swap/src/protocol/alice/behaviour.rs b/swap/src/protocol/alice/behaviour.rs index 9e876ede..c74e0356 100644 --- a/swap/src/protocol/alice/behaviour.rs +++ b/swap/src/protocol/alice/behaviour.rs @@ -1,8 +1,7 @@ use crate::env::Config; use crate::network::quote::BidQuote; -use crate::network::{peer_tracker, quote, spot_price, transfer_proof}; -use crate::protocol::alice::{encrypted_signature, execution_setup, State0, State3}; -use crate::protocol::bob::EncryptedSignature; +use crate::network::{encrypted_signature, peer_tracker, quote, spot_price, transfer_proof}; +use crate::protocol::alice::{execution_setup, State0, State3}; use crate::{bitcoin, monero}; use anyhow::{anyhow, Error, Result}; use libp2p::request_response::{RequestResponseEvent, RequestResponseMessage, ResponseChannel}; @@ -27,8 +26,8 @@ pub enum OutEvent { state3: Box, }, TransferProofAcknowledged(PeerId), - EncryptedSignature { - msg: Box, + EncryptedSignatureReceived { + msg: Box, channel: ResponseChannel<()>, peer: PeerId, }, @@ -98,6 +97,21 @@ impl From<(PeerId, transfer_proof::Message)> for OutEvent { } } +impl From<(PeerId, encrypted_signature::Message)> for OutEvent { + fn from((peer, message): (PeerId, encrypted_signature::Message)) -> Self { + match message { + encrypted_signature::Message::Request { + request, channel, .. + } => OutEvent::EncryptedSignatureReceived { + msg: Box::new(request), + channel, + peer, + }, + encrypted_signature::Message::Response { .. } => OutEvent::unexpected_response(peer), + } + } +} + impl From for OutEvent { fn from(event: spot_price::OutEvent) -> Self { map_rr_event_to_outevent(event) @@ -116,6 +130,12 @@ impl From for OutEvent { } } +impl From for OutEvent { + fn from(event: encrypted_signature::OutEvent) -> Self { + map_rr_event_to_outevent(event) + } +} + fn map_rr_event_to_outevent(event: RequestResponseEvent) -> OutEvent where OutEvent: From<(PeerId, RequestResponseMessage)>, @@ -152,24 +172,6 @@ impl From for OutEvent { } } -impl From for OutEvent { - fn from(event: encrypted_signature::OutEvent) -> Self { - use crate::protocol::alice::encrypted_signature::OutEvent::*; - match event { - MsgReceived { msg, channel, peer } => OutEvent::EncryptedSignature { - msg: Box::new(msg), - channel, - peer, - }, - AckSent => OutEvent::ResponseSent, - Failure { peer, error } => OutEvent::Failure { - peer, - error: error.context("Failure with Encrypted Signature"), - }, - } - } -} - /// A `NetworkBehaviour` that represents an XMR/BTC swap node as Alice. #[derive(NetworkBehaviour)] #[behaviour(out_event = "OutEvent", event_process = false)] @@ -191,7 +193,7 @@ impl Default for Behaviour { spot_price: spot_price::alice(), execution_setup: Default::default(), transfer_proof: transfer_proof::alice(), - encrypted_signature: Default::default(), + encrypted_signature: encrypted_signature::alice(), } } } @@ -249,7 +251,7 @@ impl Behaviour { debug!("Sent Transfer Proof"); } - pub fn send_encrypted_signature_ack(&mut self, channel: ResponseChannel<()>) -> Result<()> { - self.encrypted_signature.send_ack(channel) + pub fn send_encrypted_signature_ack(&mut self, channel: ResponseChannel<()>) { + let _ = self.encrypted_signature.send_response(channel, ()); } } diff --git a/swap/src/protocol/alice/encrypted_signature.rs b/swap/src/protocol/alice/encrypted_signature.rs deleted file mode 100644 index 1cf1f14c..00000000 --- a/swap/src/protocol/alice/encrypted_signature.rs +++ /dev/null @@ -1,95 +0,0 @@ -use crate::network::request_response::{CborCodec, EncryptedSignatureProtocol, TIMEOUT}; -use crate::protocol::bob::EncryptedSignature; -use anyhow::{anyhow, Error, Result}; -use libp2p::request_response::{ - ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent, - RequestResponseMessage, ResponseChannel, -}; -use libp2p::{NetworkBehaviour, PeerId}; -use std::time::Duration; -use tracing::debug; - -#[derive(Debug)] -pub enum OutEvent { - MsgReceived { - msg: EncryptedSignature, - channel: ResponseChannel<()>, - peer: PeerId, - }, - AckSent, - Failure { - peer: PeerId, - error: Error, - }, -} - -/// A `NetworkBehaviour` that represents receiving the Bitcoin encrypted -/// signature from Bob. -#[derive(NetworkBehaviour)] -#[behaviour(out_event = "OutEvent", event_process = false)] -#[allow(missing_debug_implementations)] -pub struct Behaviour { - rr: RequestResponse>, -} - -impl Behaviour { - pub fn send_ack(&mut self, channel: ResponseChannel<()>) -> Result<()> { - self.rr - .send_response(channel, ()) - .map_err(|err| anyhow!("Failed to ack encrypted signature: {:?}", err)) - } -} - -impl Default for Behaviour { - fn default() -> Self { - let timeout = Duration::from_secs(TIMEOUT); - let mut config = RequestResponseConfig::default(); - config.set_request_timeout(timeout); - - Self { - rr: RequestResponse::new( - CborCodec::default(), - vec![(EncryptedSignatureProtocol, ProtocolSupport::Inbound)], - config, - ), - } - } -} - -impl From> for OutEvent { - fn from(event: RequestResponseEvent) -> Self { - match event { - RequestResponseEvent::Message { - peer, - message: - RequestResponseMessage::Request { - request, channel, .. - }, - .. - } => { - debug!("Received encrypted signature from {}", peer); - OutEvent::MsgReceived { - msg: request, - channel, - peer, - } - } - RequestResponseEvent::Message { - message: RequestResponseMessage::Response { .. }, - peer, - } => OutEvent::Failure { - peer, - error: anyhow!("Alice should not get a Response"), - }, - RequestResponseEvent::InboundFailure { error, peer, .. } => OutEvent::Failure { - peer, - error: anyhow!("Inbound failure: {:?}", error), - }, - RequestResponseEvent::OutboundFailure { error, peer, .. } => OutEvent::Failure { - peer, - error: anyhow!("Outbound failure: {:?}", error), - }, - RequestResponseEvent::ResponseSent { .. } => OutEvent::AckSent, - } - } -} diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index 1aaa9eec..1b7e195c 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -3,9 +3,8 @@ use crate::database::Database; use crate::env::Config; use crate::monero::BalanceTooLow; use crate::network::quote::BidQuote; -use crate::network::{spot_price, transfer_proof, transport, TokioExecutor}; +use crate::network::{encrypted_signature, spot_price, transfer_proof, transport, TokioExecutor}; use crate::protocol::alice::{AliceState, Behaviour, OutEvent, State3, Swap}; -use crate::protocol::bob::EncryptedSignature; use crate::seed::Seed; use crate::{bitcoin, kraken, monero}; use anyhow::{bail, Context, Result}; @@ -34,7 +33,7 @@ pub struct EventLoop { max_buy: bitcoin::Amount, /// Stores a sender per peer for incoming [`EncryptedSignature`]s. - recv_encrypted_signature: HashMap>, + recv_encrypted_signature: HashMap>, /// Stores a list of futures, waiting for transfer proof which will be sent /// to the given peer. send_transfer_proof: @@ -156,7 +155,7 @@ where OutEvent::TransferProofAcknowledged(peer) => { trace!(%peer, "Bob acknowledged transfer proof"); } - OutEvent::EncryptedSignature{ msg, channel, peer } => { + OutEvent::EncryptedSignatureReceived{ msg, channel, peer } => { match self.recv_encrypted_signature.remove(&peer) { Some(sender) => { // this failing just means the receiver is no longer interested ... @@ -167,9 +166,7 @@ where } } - if let Err(error) = self.swarm.send_encrypted_signature_ack(channel) { - error!("Failed to send Encrypted Signature ack: {:?}", error); - } + self.swarm.send_encrypted_signature_ack(channel); } OutEvent::ResponseSent => {} OutEvent::Failure {peer, error} => { @@ -307,7 +304,7 @@ impl LatestRate for kraken::RateUpdateStream { #[derive(Debug)] pub struct EventLoopHandle { - recv_encrypted_signature: Option>, + recv_encrypted_signature: Option>, send_transfer_proof: Option>, } diff --git a/swap/src/protocol/bob.rs b/swap/src/protocol/bob.rs index cb138013..3e5a7e0c 100644 --- a/swap/src/protocol/bob.rs +++ b/swap/src/protocol/bob.rs @@ -1,6 +1,6 @@ use crate::database::Database; use crate::env::Config; -use crate::network::{peer_tracker, spot_price}; +use crate::network::{encrypted_signature, peer_tracker, spot_price}; use crate::protocol::bob; use crate::{bitcoin, monero}; use anyhow::{anyhow, Error, Result}; @@ -13,7 +13,6 @@ use tracing::debug; use uuid::Uuid; pub use self::cancel::cancel; -pub use self::encrypted_signature::EncryptedSignature; pub use self::event_loop::{EventLoop, EventLoopHandle}; pub use self::refund::refund; pub use self::state::*; @@ -22,7 +21,6 @@ use crate::network::quote::BidQuote; use crate::network::{quote, transfer_proof}; pub mod cancel; -mod encrypted_signature; pub mod event_loop; mod execution_setup; pub mod refund; @@ -166,6 +164,17 @@ impl From for OutEvent { } } +impl From for OutEvent { + fn from(message: encrypted_signature::Message) -> Self { + match message { + encrypted_signature::Message::Request { .. } => OutEvent::unexpected_request(), + encrypted_signature::Message::Response { .. } => { + OutEvent::EncryptedSignatureAcknowledged + } + } + } +} + impl From for OutEvent { fn from(event: peer_tracker::OutEvent) -> Self { match event { @@ -194,6 +203,12 @@ impl From for OutEvent { } } +impl From for OutEvent { + fn from(event: encrypted_signature::OutEvent) -> Self { + map_rr_event_to_outevent(event) + } +} + fn map_rr_event_to_outevent(event: RequestResponseEvent) -> OutEvent where OutEvent: From>, @@ -224,18 +239,6 @@ impl From for OutEvent { } } -impl From for OutEvent { - fn from(event: encrypted_signature::OutEvent) -> Self { - use encrypted_signature::OutEvent::*; - match event { - Acknowledged => OutEvent::EncryptedSignatureAcknowledged, - Failure(err) => { - OutEvent::CommunicationError(err.context("Failure with Encrypted Signature")) - } - } - } -} - /// A `NetworkBehaviour` that represents an XMR/BTC swap node as Bob. #[derive(NetworkBehaviour)] #[behaviour(out_event = "OutEvent", event_process = false)] @@ -257,7 +260,7 @@ impl Default for Behaviour { spot_price: spot_price::bob(), execution_setup: Default::default(), transfer_proof: transfer_proof::bob(), - encrypted_signature: Default::default(), + encrypted_signature: encrypted_signature::bob(), } } } @@ -286,8 +289,8 @@ impl Behaviour { alice: PeerId, tx_redeem_encsig: bitcoin::EncryptedSignature, ) { - let msg = EncryptedSignature { tx_redeem_encsig }; - self.encrypted_signature.send(alice, msg); + let msg = encrypted_signature::Request { tx_redeem_encsig }; + self.encrypted_signature.send_request(&alice, msg); debug!("Encrypted signature sent"); } diff --git a/swap/src/protocol/bob/encrypted_signature.rs b/swap/src/protocol/bob/encrypted_signature.rs deleted file mode 100644 index ed121975..00000000 --- a/swap/src/protocol/bob/encrypted_signature.rs +++ /dev/null @@ -1,74 +0,0 @@ -use crate::network::request_response::{CborCodec, EncryptedSignatureProtocol, TIMEOUT}; -use anyhow::{anyhow, Error}; -use libp2p::request_response::{ - ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent, - RequestResponseMessage, -}; -use libp2p::{NetworkBehaviour, PeerId}; -use serde::{Deserialize, Serialize}; -use std::time::Duration; - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct EncryptedSignature { - pub tx_redeem_encsig: crate::bitcoin::EncryptedSignature, -} - -#[derive(Debug)] -pub enum OutEvent { - Acknowledged, - Failure(Error), -} - -/// A `NetworkBehaviour` that represents sending encrypted signature to Alice. -#[derive(NetworkBehaviour)] -#[behaviour(out_event = "OutEvent", event_process = false)] -#[allow(missing_debug_implementations)] -pub struct Behaviour { - rr: RequestResponse>, -} - -impl Behaviour { - pub fn send(&mut self, alice: PeerId, msg: EncryptedSignature) { - let _id = self.rr.send_request(&alice, msg); - } -} - -impl Default for Behaviour { - fn default() -> Self { - let timeout = Duration::from_secs(TIMEOUT); - let mut config = RequestResponseConfig::default(); - config.set_request_timeout(timeout); - - Self { - rr: RequestResponse::new( - CborCodec::default(), - vec![(EncryptedSignatureProtocol, ProtocolSupport::Outbound)], - config, - ), - } - } -} - -impl From> for OutEvent { - fn from(event: RequestResponseEvent) -> Self { - match event { - RequestResponseEvent::Message { - message: RequestResponseMessage::Request { .. }, - .. - } => OutEvent::Failure(anyhow!("Bob should never get a request from Alice")), - RequestResponseEvent::Message { - message: RequestResponseMessage::Response { .. }, - .. - } => OutEvent::Acknowledged, - RequestResponseEvent::InboundFailure { error, .. } => { - OutEvent::Failure(anyhow!("Inbound failure: {:?}", error)) - } - RequestResponseEvent::OutboundFailure { error, .. } => { - OutEvent::Failure(anyhow!("Outbound failure: {:?}", error)) - } - RequestResponseEvent::ResponseSent { .. } => OutEvent::Failure(anyhow!( - "Bob does not send the encrypted signature response to Alice" - )), - } - } -} diff --git a/swap/src/protocol/bob/state.rs b/swap/src/protocol/bob/state.rs index 28aa05a0..b2703a51 100644 --- a/swap/src/protocol/bob/state.rs +++ b/swap/src/protocol/bob/state.rs @@ -7,7 +7,7 @@ use crate::monero::wallet::WatchRequest; use crate::monero::{monero_private_key, TransferProof}; use crate::monero_ext::ScalarExt; use crate::protocol::alice::{Message1, Message3}; -use crate::protocol::bob::{EncryptedSignature, Message0, Message2, Message4}; +use crate::protocol::bob::{Message0, Message2, Message4}; use crate::protocol::CROSS_CURVE_PROOF_SYSTEM; use anyhow::{anyhow, bail, Context, Result}; use ecdsa_fun::adaptor::{Adaptor, HashTranscript}; @@ -404,12 +404,6 @@ pub struct State4 { } impl State4 { - pub fn next_message(&self) -> EncryptedSignature { - EncryptedSignature { - tx_redeem_encsig: self.tx_redeem_encsig(), - } - } - pub fn tx_redeem_encsig(&self) -> bitcoin::EncryptedSignature { let tx_redeem = bitcoin::TxRedeem::new(&self.tx_lock, &self.redeem_address); self.b.encsign(self.S_a_bitcoin, tx_redeem.digest()) From 0c0a322a8f6bb63166c5222ebb8bcb2c5a308556 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 18 Mar 2021 17:24:54 +1100 Subject: [PATCH 05/11] Rename module to better represent what it contains This module provides an implementation of the RequestResponseCodec using a cbor serialization. --- swap/src/network.rs | 2 +- .../network/{request_response.rs => cbor_request_response.rs} | 0 swap/src/network/encrypted_signature.rs | 2 +- swap/src/network/quote.rs | 2 +- swap/src/network/spot_price.rs | 2 +- swap/src/network/transfer_proof.rs | 2 +- swap/src/protocol/alice/execution_setup.rs | 2 +- swap/src/protocol/bob/execution_setup.rs | 2 +- 8 files changed, 7 insertions(+), 7 deletions(-) rename swap/src/network/{request_response.rs => cbor_request_response.rs} (100%) diff --git a/swap/src/network.rs b/swap/src/network.rs index 49672cf6..388cd1e0 100644 --- a/swap/src/network.rs +++ b/swap/src/network.rs @@ -1,7 +1,7 @@ +pub mod cbor_request_response; pub mod encrypted_signature; pub mod peer_tracker; pub mod quote; -pub mod request_response; pub mod spot_price; pub mod transfer_proof; pub mod transport; diff --git a/swap/src/network/request_response.rs b/swap/src/network/cbor_request_response.rs similarity index 100% rename from swap/src/network/request_response.rs rename to swap/src/network/cbor_request_response.rs diff --git a/swap/src/network/encrypted_signature.rs b/swap/src/network/encrypted_signature.rs index efbeda88..1f5d94f9 100644 --- a/swap/src/network/encrypted_signature.rs +++ b/swap/src/network/encrypted_signature.rs @@ -1,4 +1,4 @@ -use crate::network::request_response::CborCodec; +use crate::network::cbor_request_response::CborCodec; use libp2p::core::ProtocolName; use libp2p::request_response::{ ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent, diff --git a/swap/src/network/quote.rs b/swap/src/network/quote.rs index ee938a8a..9c649d2c 100644 --- a/swap/src/network/quote.rs +++ b/swap/src/network/quote.rs @@ -1,5 +1,5 @@ use crate::bitcoin; -use crate::network::request_response::CborCodec; +use crate::network::cbor_request_response::CborCodec; use libp2p::core::ProtocolName; use libp2p::request_response::{ ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent, diff --git a/swap/src/network/spot_price.rs b/swap/src/network/spot_price.rs index 1b5aa617..2a57db09 100644 --- a/swap/src/network/spot_price.rs +++ b/swap/src/network/spot_price.rs @@ -1,4 +1,4 @@ -use crate::network::request_response::CborCodec; +use crate::network::cbor_request_response::CborCodec; use crate::{bitcoin, monero}; use libp2p::core::ProtocolName; use libp2p::request_response::{ diff --git a/swap/src/network/transfer_proof.rs b/swap/src/network/transfer_proof.rs index bcca42f5..3cf83adb 100644 --- a/swap/src/network/transfer_proof.rs +++ b/swap/src/network/transfer_proof.rs @@ -1,5 +1,5 @@ use crate::monero; -use crate::network::request_response::CborCodec; +use crate::network::cbor_request_response::CborCodec; use libp2p::core::ProtocolName; use libp2p::request_response::{ ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent, diff --git a/swap/src/protocol/alice/execution_setup.rs b/swap/src/protocol/alice/execution_setup.rs index b2ec5162..965b6bd6 100644 --- a/swap/src/protocol/alice/execution_setup.rs +++ b/swap/src/protocol/alice/execution_setup.rs @@ -1,5 +1,5 @@ use crate::bitcoin::{EncryptedSignature, Signature}; -use crate::network::request_response::BUF_SIZE; +use crate::network::cbor_request_response::BUF_SIZE; use crate::protocol::alice::{State0, State3}; use crate::protocol::bob::{Message0, Message2, Message4}; use crate::{bitcoin, monero}; diff --git a/swap/src/protocol/bob/execution_setup.rs b/swap/src/protocol/bob/execution_setup.rs index f2106737..6fa7491e 100644 --- a/swap/src/protocol/bob/execution_setup.rs +++ b/swap/src/protocol/bob/execution_setup.rs @@ -1,5 +1,5 @@ use crate::bitcoin::Signature; -use crate::network::request_response::BUF_SIZE; +use crate::network::cbor_request_response::BUF_SIZE; use crate::protocol::alice::{Message1, Message3}; use crate::protocol::bob::{State0, State2}; use anyhow::{Context, Error, Result}; From 2c9ab4f6ebb18903c3a6d930f48dc555a98dde22 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 18 Mar 2021 17:48:54 +1100 Subject: [PATCH 06/11] Improve code structure and error messages for running swaps The quote message was repeated and we should set the overall failure into a context to know what went wrong. --- swap/src/bin/swap.rs | 18 +++++++++--------- swap/src/protocol/bob/event_loop.rs | 2 +- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/swap/src/bin/swap.rs b/swap/src/bin/swap.rs index 2455753c..8af25443 100644 --- a/swap/src/bin/swap.rs +++ b/swap/src/bin/swap.rs @@ -111,7 +111,7 @@ async fn main() -> Result<()> { alice_addr, bitcoin_wallet.clone(), )?; - let handle = tokio::spawn(event_loop.run()); + let event_loop = tokio::spawn(event_loop.run()); let send_bitcoin = determine_btc_to_swap( event_loop_handle.request_quote(), @@ -142,13 +142,14 @@ async fn main() -> Result<()> { .with_init_params(send_bitcoin) .build()?; - let swap = bob::run(swap); tokio::select! { - event_loop_result = handle => { - event_loop_result??; + result = event_loop => { + result + .context("EventLoop panicked")? + .context("EventLoop failed")?; }, - swap_result = swap => { - swap_result?; + result = bob::run(swap) => { + result.context("Failed to complete swap")?; } } } @@ -207,12 +208,11 @@ async fn main() -> Result<()> { ) .build()?; - let swap = bob::run(swap); tokio::select! { event_loop_result = handle => { event_loop_result??; }, - swap_result = swap => { + swap_result = bob::run(swap) => { swap_result?; } } @@ -314,7 +314,7 @@ async fn determine_btc_to_swap( ) -> Result { debug!("Requesting quote"); - let bid_quote = request_quote.await.context("Failed to request quote")?; + let bid_quote = request_quote.await?; info!("Received quote: 1 XMR ~ {}", bid_quote.price); diff --git a/swap/src/protocol/bob/event_loop.rs b/swap/src/protocol/bob/event_loop.rs index a279ddab..47c7c195 100644 --- a/swap/src/protocol/bob/event_loop.rs +++ b/swap/src/protocol/bob/event_loop.rs @@ -223,7 +223,7 @@ impl EventLoop { } OutEvent::ResponseSent => {} OutEvent::CommunicationError(err) => { - bail!("Communication error: {:#}", err) + bail!(err.context("Communication error")) } } }, From 2200fce3f3e9f02d26aba4eb4dc739c9153745bf Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 23 Mar 2021 16:56:04 +1100 Subject: [PATCH 07/11] Pass Swarm into EventLoop This reduces the amount of arguments we need to pass into the eventloop at the expense of slightly more setup of the swarm. --- swap/src/bin/asb.rs | 11 ++++++++--- swap/src/bin/swap.rs | 26 +++++++++++++------------- swap/src/network.rs | 17 +---------------- swap/src/network/swarm.rs | 23 +++++++++++++++++++++++ swap/src/protocol/alice/event_loop.rs | 26 +++----------------------- swap/src/protocol/bob/event_loop.rs | 23 +++-------------------- swap/tests/testutils/mod.rs | 19 ++++++++++--------- 7 files changed, 61 insertions(+), 84 deletions(-) create mode 100644 swap/src/network/swarm.rs diff --git a/swap/src/bin/asb.rs b/swap/src/bin/asb.rs index b148ced7..b8a00da8 100644 --- a/swap/src/bin/asb.rs +++ b/swap/src/bin/asb.rs @@ -15,6 +15,7 @@ use anyhow::{Context, Result}; use bdk::descriptor::Segwitv0; use bdk::keys::DerivableKey; +use libp2p::Swarm; use prettytable::{row, Table}; use std::path::Path; use std::sync::Arc; @@ -27,7 +28,8 @@ use swap::database::Database; use swap::env::GetConfig; use swap::fs::default_config_path; use swap::monero::Amount; -use swap::protocol::alice::{run, EventLoop}; +use swap::network::swarm; +use swap::protocol::alice::{run, Behaviour, EventLoop}; use swap::seed::Seed; use swap::trace::init_tracing; use swap::{bitcoin, env, kraken, monero}; @@ -93,9 +95,12 @@ async fn main() -> Result<()> { let kraken_rate_updates = kraken::connect()?; + let mut swarm = swarm::new::(&seed)?; + Swarm::listen_on(&mut swarm, config.network.listen) + .context("Failed to listen network interface")?; + let (event_loop, mut swap_receiver) = EventLoop::new( - config.network.listen, - seed, + swarm, env_config, Arc::new(bitcoin_wallet), Arc::new(monero_wallet), diff --git a/swap/src/bin/swap.rs b/swap/src/bin/swap.rs index 8af25443..da3d2a23 100644 --- a/swap/src/bin/swap.rs +++ b/swap/src/bin/swap.rs @@ -25,8 +25,9 @@ use swap::cli::command::{AliceConnectParams, Arguments, Command, Data, MoneroPar use swap::database::Database; use swap::env::{Config, GetConfig}; use swap::network::quote::BidQuote; +use swap::network::swarm; use swap::protocol::bob; -use swap::protocol::bob::{Builder, EventLoop}; +use swap::protocol::bob::{Behaviour, Builder, EventLoop}; use swap::seed::Seed; use swap::{bitcoin, env, monero}; use tracing::{debug, error, info, warn, Level}; @@ -105,12 +106,12 @@ async fn main() -> Result<()> { let (monero_wallet, _process) = init_monero_wallet(data_dir, monero_daemon_host, env_config).await?; let bitcoin_wallet = Arc::new(bitcoin_wallet); - let (event_loop, mut event_loop_handle) = EventLoop::new( - &seed.derive_libp2p_identity(), - alice_peer_id, - alice_addr, - bitcoin_wallet.clone(), - )?; + + let mut swarm = swarm::new::(&seed)?; + swarm.add_address(alice_peer_id, alice_addr); + + let (event_loop, mut event_loop_handle) = + EventLoop::new(swarm, alice_peer_id, bitcoin_wallet.clone())?; let event_loop = tokio::spawn(event_loop.run()); let send_bitcoin = determine_btc_to_swap( @@ -189,12 +190,11 @@ async fn main() -> Result<()> { init_monero_wallet(data_dir, monero_daemon_host, env_config).await?; let bitcoin_wallet = Arc::new(bitcoin_wallet); - let (event_loop, event_loop_handle) = EventLoop::new( - &seed.derive_libp2p_identity(), - alice_peer_id, - alice_addr, - bitcoin_wallet.clone(), - )?; + let mut swarm = swarm::new::(&seed)?; + swarm.add_address(alice_peer_id, alice_addr); + + let (event_loop, event_loop_handle) = + EventLoop::new(swarm, alice_peer_id, bitcoin_wallet.clone())?; let handle = tokio::spawn(event_loop.run()); let swap = Builder::new( diff --git a/swap/src/network.rs b/swap/src/network.rs index 388cd1e0..f279fb50 100644 --- a/swap/src/network.rs +++ b/swap/src/network.rs @@ -3,21 +3,6 @@ pub mod encrypted_signature; pub mod peer_tracker; pub mod quote; pub mod spot_price; +pub mod swarm; pub mod transfer_proof; pub mod transport; - -use libp2p::core::Executor; -use std::future::Future; -use std::pin::Pin; -use tokio::runtime::Handle; - -#[allow(missing_debug_implementations)] -pub struct TokioExecutor { - pub handle: Handle, -} - -impl Executor for TokioExecutor { - fn exec(&self, future: Pin + Send>>) { - let _ = self.handle.spawn(future); - } -} diff --git a/swap/src/network/swarm.rs b/swap/src/network/swarm.rs new file mode 100644 index 00000000..40e9f6d4 --- /dev/null +++ b/swap/src/network/swarm.rs @@ -0,0 +1,23 @@ +use crate::network::transport; +use crate::seed::Seed; +use anyhow::Result; +use libp2p::swarm::{NetworkBehaviour, SwarmBuilder}; +use libp2p::Swarm; + +pub fn new(seed: &Seed) -> Result> +where + B: NetworkBehaviour + Default, +{ + let identity = seed.derive_libp2p_identity(); + + let behaviour = B::default(); + let transport = transport::build(&identity)?; + + let swarm = SwarmBuilder::new(transport, behaviour, identity.public().into_peer_id()) + .executor(Box::new(|f| { + tokio::spawn(f); + })) + .build(); + + Ok(swarm) +} diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index 1b7e195c..dcc6f638 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -3,15 +3,13 @@ use crate::database::Database; use crate::env::Config; use crate::monero::BalanceTooLow; use crate::network::quote::BidQuote; -use crate::network::{encrypted_signature, spot_price, transfer_proof, transport, TokioExecutor}; +use crate::network::{encrypted_signature, spot_price, transfer_proof}; use crate::protocol::alice::{AliceState, Behaviour, OutEvent, State3, Swap}; -use crate::seed::Seed; use crate::{bitcoin, kraken, monero}; use anyhow::{bail, Context, Result}; use futures::future; use futures::future::{BoxFuture, FutureExt}; use futures::stream::{FuturesUnordered, StreamExt}; -use libp2p::core::Multiaddr; use libp2p::{PeerId, Swarm}; use rand::rngs::OsRng; use std::collections::HashMap; @@ -24,7 +22,6 @@ use uuid::Uuid; #[allow(missing_debug_implementations)] pub struct EventLoop { swarm: libp2p::Swarm, - peer_id: PeerId, env_config: Config, bitcoin_wallet: Arc, monero_wallet: Arc, @@ -46,10 +43,8 @@ impl EventLoop where LR: LatestRate, { - #[allow(clippy::too_many_arguments)] pub fn new( - listen_address: Multiaddr, - seed: Seed, + swarm: Swarm, env_config: Config, bitcoin_wallet: Arc, monero_wallet: Arc, @@ -57,25 +52,10 @@ where latest_rate: LR, max_buy: bitcoin::Amount, ) -> Result<(Self, mpsc::Receiver)> { - let identity = seed.derive_libp2p_identity(); - let behaviour = Behaviour::default(); - let transport = transport::build(&identity)?; - let peer_id = PeerId::from(identity.public()); - - let mut swarm = libp2p::swarm::SwarmBuilder::new(transport, behaviour, peer_id) - .executor(Box::new(TokioExecutor { - handle: tokio::runtime::Handle::current(), - })) - .build(); - - Swarm::listen_on(&mut swarm, listen_address.clone()) - .with_context(|| format!("Address is not supported: {:#}", listen_address))?; - let swap_channel = MpscChannels::default(); let event_loop = EventLoop { swarm, - peer_id, env_config, bitcoin_wallet, monero_wallet, @@ -90,7 +70,7 @@ where } pub fn peer_id(&self) -> PeerId { - self.peer_id + *Swarm::local_peer_id(&self.swarm) } pub async fn run(mut self) { diff --git a/swap/src/protocol/bob/event_loop.rs b/swap/src/protocol/bob/event_loop.rs index 47c7c195..16472190 100644 --- a/swap/src/protocol/bob/event_loop.rs +++ b/swap/src/protocol/bob/event_loop.rs @@ -1,12 +1,11 @@ use crate::bitcoin::EncryptedSignature; use crate::network::quote::BidQuote; -use crate::network::{spot_price, transfer_proof, transport, TokioExecutor}; +use crate::network::{spot_price, transfer_proof}; use crate::protocol::bob::{Behaviour, OutEvent, State0, State2}; use crate::{bitcoin, monero}; use anyhow::{anyhow, bail, Context, Result}; use futures::FutureExt; -use libp2p::core::Multiaddr; -use libp2p::PeerId; +use libp2p::{PeerId, Swarm}; use std::convert::Infallible; use std::sync::Arc; use tokio::sync::mpsc::{Receiver, Sender}; @@ -131,26 +130,10 @@ pub struct EventLoop { impl EventLoop { pub fn new( - identity: &libp2p::core::identity::Keypair, + swarm: Swarm, alice_peer_id: PeerId, - alice_addr: Multiaddr, bitcoin_wallet: Arc, ) -> Result<(Self, EventLoopHandle)> { - let behaviour = Behaviour::default(); - let transport = transport::build(identity)?; - - let mut swarm = libp2p::swarm::SwarmBuilder::new( - transport, - behaviour, - identity.public().into_peer_id(), - ) - .executor(Box::new(TokioExecutor { - handle: tokio::runtime::Handle::current(), - })) - .build(); - - swarm.add_address(alice_peer_id, alice_addr); - let start_execution_setup = Channels::new(); let done_execution_setup = Channels::new(); let recv_transfer_proof = Channels::new(); diff --git a/swap/tests/testutils/mod.rs b/swap/tests/testutils/mod.rs index d22fc2e9..1235462e 100644 --- a/swap/tests/testutils/mod.rs +++ b/swap/tests/testutils/mod.rs @@ -7,7 +7,7 @@ use bitcoin_harness::{BitcoindRpcApi, Client}; use futures::Future; use get_port::get_port; use libp2p::core::Multiaddr; -use libp2p::PeerId; +use libp2p::{PeerId, Swarm}; use monero_harness::{image, Monero}; use std::convert::Infallible; use std::path::{Path, PathBuf}; @@ -17,6 +17,7 @@ use swap::asb::FixedRate; use swap::bitcoin::{CancelTimelock, PunishTimelock}; use swap::database::Database; use swap::env::{Config, GetConfig}; +use swap::network::swarm; use swap::protocol::alice::{AliceState, Swap}; use swap::protocol::bob::BobState; use swap::protocol::{alice, bob}; @@ -70,12 +71,10 @@ impl BobParams { } pub fn new_eventloop(&self) -> Result<(bob::EventLoop, bob::EventLoopHandle)> { - bob::EventLoop::new( - &self.seed.derive_libp2p_identity(), - self.alice_peer_id, - self.alice_address.clone(), - self.bitcoin_wallet.clone(), - ) + let mut swarm = swarm::new::(&self.seed)?; + swarm.add_address(self.alice_peer_id, self.alice_address.clone()); + + bob::EventLoop::new(swarm, self.alice_peer_id, self.bitcoin_wallet.clone()) } } @@ -384,9 +383,11 @@ where ) .await; + let mut alice_swarm = swarm::new::(&alice_seed).unwrap(); + Swarm::listen_on(&mut alice_swarm, alice_listen_address.clone()).unwrap(); + let (alice_event_loop, alice_swap_handle) = alice::EventLoop::new( - alice_listen_address.clone(), - alice_seed, + alice_swarm, env_config, alice_bitcoin_wallet.clone(), alice_monero_wallet.clone(), From 804b34f6b0dc5e60a6f8dbe27cb34ffe48bc64a5 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 18 Mar 2021 15:54:33 +1100 Subject: [PATCH 08/11] Listen on all swarm events instead of just behaviour events --- swap/src/protocol/alice/event_loop.rs | 36 ++++++++++++++++++++------- swap/src/protocol/bob/event_loop.rs | 22 +++++++++------- 2 files changed, 40 insertions(+), 18 deletions(-) diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index dcc6f638..136453a7 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -10,6 +10,7 @@ use anyhow::{bail, Context, Result}; use futures::future; use futures::future::{BoxFuture, FutureExt}; use futures::stream::{FuturesUnordered, StreamExt}; +use libp2p::swarm::SwarmEvent; use libp2p::{PeerId, Swarm}; use rand::rngs::OsRng; use std::collections::HashMap; @@ -80,12 +81,12 @@ where loop { tokio::select! { - swarm_event = self.swarm.next() => { + swarm_event = self.swarm.next_event() => { match swarm_event { - OutEvent::ConnectionEstablished(alice) => { + SwarmEvent::Behaviour(OutEvent::ConnectionEstablished(alice)) => { debug!("Connection Established with {}", alice); } - OutEvent::SpotPriceRequested { request, channel, peer } => { + SwarmEvent::Behaviour(OutEvent::SpotPriceRequested { request, channel, peer }) => { let btc = request.btc; let xmr = match self.handle_spot_price_request(btc, self.monero_wallet.clone()).await { Ok(xmr) => xmr, @@ -111,7 +112,7 @@ where } } } - OutEvent::QuoteRequested { channel, peer } => { + SwarmEvent::Behaviour(OutEvent::QuoteRequested { channel, peer }) => { let quote = match self.make_quote(self.max_buy).await { Ok(quote) => quote, Err(e) => { @@ -129,13 +130,13 @@ where } } } - OutEvent::ExecutionSetupDone{bob_peer_id, state3} => { + SwarmEvent::Behaviour(OutEvent::ExecutionSetupDone{bob_peer_id, state3}) => { let _ = self.handle_execution_setup_done(bob_peer_id, *state3).await; } - OutEvent::TransferProofAcknowledged(peer) => { + SwarmEvent::Behaviour(OutEvent::TransferProofAcknowledged(peer)) => { trace!(%peer, "Bob acknowledged transfer proof"); } - OutEvent::EncryptedSignatureReceived{ msg, channel, peer } => { + SwarmEvent::Behaviour(OutEvent::EncryptedSignatureReceived{ msg, channel, peer }) => { match self.recv_encrypted_signature.remove(&peer) { Some(sender) => { // this failing just means the receiver is no longer interested ... @@ -148,10 +149,27 @@ where self.swarm.send_encrypted_signature_ack(channel); } - OutEvent::ResponseSent => {} - OutEvent::Failure {peer, error} => { + SwarmEvent::Behaviour(OutEvent::ResponseSent) => {} + SwarmEvent::Behaviour(OutEvent::Failure {peer, error}) => { error!(%peer, "Communication error: {:#}", error); } + SwarmEvent::ConnectionEstablished { peer_id: peer, endpoint, .. } => { + tracing::debug!(%peer, address = %endpoint.get_remote_address(), "New connection established"); + } + SwarmEvent::IncomingConnectionError { send_back_addr: address, error, .. } => { + tracing::warn!(%address, "Failed to set up connection with peer: {}", error); + } + SwarmEvent::ConnectionClosed { peer_id: peer, num_established, endpoint, cause } if num_established == 0 => { + match cause { + Some(error) => { + tracing::warn!(%peer, address = %endpoint.get_remote_address(), "Lost connection: {}", error); + }, + None => { + tracing::info!(%peer, address = %endpoint.get_remote_address(), "Successfully closed connection"); + } + } + } + _ => {} } }, next_transfer_proof = self.send_transfer_proof.next() => { diff --git a/swap/src/protocol/bob/event_loop.rs b/swap/src/protocol/bob/event_loop.rs index 16472190..fc40805a 100644 --- a/swap/src/protocol/bob/event_loop.rs +++ b/swap/src/protocol/bob/event_loop.rs @@ -5,6 +5,7 @@ use crate::protocol::bob::{Behaviour, OutEvent, State0, State2}; use crate::{bitcoin, monero}; use anyhow::{anyhow, bail, Context, Result}; use futures::FutureExt; +use libp2p::swarm::SwarmEvent; use libp2p::{PeerId, Swarm}; use std::convert::Infallible; use std::sync::Arc; @@ -180,34 +181,37 @@ impl EventLoop { pub async fn run(mut self) -> Result { loop { tokio::select! { - swarm_event = self.swarm.next().fuse() => { + swarm_event = self.swarm.next_event().fuse() => { match swarm_event { - OutEvent::ConnectionEstablished(peer_id) => { + SwarmEvent::Behaviour(OutEvent::ConnectionEstablished(peer_id)) => { let _ = self.conn_established.send(peer_id).await; } - OutEvent::SpotPriceReceived(msg) => { + SwarmEvent::Behaviour(OutEvent::SpotPriceReceived(msg)) => { let _ = self.recv_spot_price.send(msg).await; }, - OutEvent::QuoteReceived(msg) => { + SwarmEvent::Behaviour(OutEvent::QuoteReceived(msg)) => { let _ = self.recv_quote.send(msg).await; }, - OutEvent::ExecutionSetupDone(res) => { + SwarmEvent::Behaviour(OutEvent::ExecutionSetupDone(res)) => { let _ = self.done_execution_setup.send(res.map(|state|*state)).await; } - OutEvent::TransferProofReceived{ msg, channel }=> { + SwarmEvent::Behaviour(OutEvent::TransferProofReceived{ msg, channel }) => { let _ = self.recv_transfer_proof.send(*msg).await; // Send back empty response so that the request/response protocol completes. if let Err(error) = self.swarm.transfer_proof.send_response(channel, ()) { error!("Failed to send Transfer Proof ack: {:?}", error); } } - OutEvent::EncryptedSignatureAcknowledged => { + SwarmEvent::Behaviour(OutEvent::EncryptedSignatureAcknowledged) => { debug!("Alice acknowledged encrypted signature"); } - OutEvent::ResponseSent => {} - OutEvent::CommunicationError(err) => { + SwarmEvent::Behaviour(OutEvent::ResponseSent) => { + + } + SwarmEvent::Behaviour(OutEvent::CommunicationError(err)) => { bail!(err.context("Communication error")) } + _ => {} } }, option = self.dial_alice.recv().fuse() => { From cde3f0f74ad7b500db607057238d68b182d69a04 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 18 Mar 2021 18:00:02 +1100 Subject: [PATCH 09/11] Remove connection handling from swap execution The swap should not be concerned with connection handling. This is the responsibility of the overall application. All but the execution-setup NetworkBehaviour are `request-response` behaviours. These have built-in functionality to automatically emit a dial attempt in case we are not connected at the time we want to send a message. We remove all of the manual dialling code from the swap in favor of this behaviour. Additionally, we make sure to establish a connection as soon as the EventLoop gets started. In case we ever loose the connection to Alice, we try to re-establish it. --- bors.toml | 3 +- swap/src/bin/swap.rs | 5 +- swap/src/network.rs | 1 - swap/src/network/peer_tracker.rs | 126 ------------------ swap/src/protocol/alice/behaviour.rs | 15 +-- swap/src/protocol/alice/event_loop.rs | 3 - swap/src/protocol/bob.rs | 19 +-- swap/src/protocol/bob/event_loop.rs | 80 ++++++----- swap/src/protocol/bob/swap.rs | 9 -- ...fore_comm.rs => happy_path_restart_bob.rs} | 30 ++++- swap/tests/testutils/mod.rs | 3 +- 11 files changed, 79 insertions(+), 215 deletions(-) delete mode 100644 swap/src/network/peer_tracker.rs rename swap/tests/{happy_path_restart_bob_before_comm.rs => happy_path_restart_bob.rs} (51%) diff --git a/bors.toml b/bors.toml index 9375744a..9a5198e7 100644 --- a/bors.toml +++ b/bors.toml @@ -7,7 +7,8 @@ status = [ "test (x86_64-unknown-linux-gnu, ubuntu-latest)", "test (x86_64-apple-darwin, macos-latest)", "docker_tests (happy_path)", - "docker_tests (happy_path_restart_bob_before_comm)", + "docker_tests (happy_path_restart_bob_after_xmr_locked)", + "docker_tests (happy_path_restart_bob_before_xmr_locked)", "docker_tests (bob_refunds_using_cancel_and_refund_command)", "docker_tests (bob_refunds_using_cancel_and_refund_command_timelock_not_expired_force)", "docker_tests (bob_refunds_using_cancel_and_refund_command_timelock_not_expired)", diff --git a/swap/src/bin/swap.rs b/swap/src/bin/swap.rs index da3d2a23..c1d6721f 100644 --- a/swap/src/bin/swap.rs +++ b/swap/src/bin/swap.rs @@ -146,8 +146,7 @@ async fn main() -> Result<()> { tokio::select! { result = event_loop => { result - .context("EventLoop panicked")? - .context("EventLoop failed")?; + .context("EventLoop panicked")?; }, result = bob::run(swap) => { result.context("Failed to complete swap")?; @@ -210,7 +209,7 @@ async fn main() -> Result<()> { tokio::select! { event_loop_result = handle => { - event_loop_result??; + event_loop_result?; }, swap_result = bob::run(swap) => { swap_result?; diff --git a/swap/src/network.rs b/swap/src/network.rs index f279fb50..4a3fb29e 100644 --- a/swap/src/network.rs +++ b/swap/src/network.rs @@ -1,6 +1,5 @@ pub mod cbor_request_response; pub mod encrypted_signature; -pub mod peer_tracker; pub mod quote; pub mod spot_price; pub mod swarm; diff --git a/swap/src/network/peer_tracker.rs b/swap/src/network/peer_tracker.rs deleted file mode 100644 index 36a536ce..00000000 --- a/swap/src/network/peer_tracker.rs +++ /dev/null @@ -1,126 +0,0 @@ -use futures::task::Context; -use libp2p::core::connection::ConnectionId; -use libp2p::core::ConnectedPoint; -use libp2p::swarm::protocols_handler::DummyProtocolsHandler; -use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters}; -use libp2p::{Multiaddr, PeerId}; -use std::collections::{HashMap, VecDeque}; -use std::task::Poll; - -#[derive(Debug, Copy, Clone)] -pub enum OutEvent { - ConnectionEstablished(PeerId), -} - -/// A NetworkBehaviour that tracks connections to the counterparty. Although the -/// libp2p `NetworkBehaviour` abstraction encompasses connections to multiple -/// peers we only ever connect to a single counterparty. Peer Tracker tracks -/// that connection. -#[derive(Default, Debug)] -pub struct Behaviour { - connected: Option<(PeerId, Multiaddr)>, - address_of_peer: HashMap, - events: VecDeque, -} - -impl Behaviour { - /// Return whether we are connected to the given peer. - pub fn is_connected(&self, peer_id: &PeerId) -> bool { - if let Some((connected_peer_id, _)) = &self.connected { - if connected_peer_id == peer_id { - return true; - } - } - false - } - - /// Returns the peer id of counterparty if we are connected. - pub fn counterparty_peer_id(&self) -> Option { - if let Some((id, _)) = &self.connected { - return Some(*id); - } - None - } - - /// Returns the peer_id and multiaddr of counterparty if we are connected. - pub fn counterparty(&self) -> Option<(PeerId, Multiaddr)> { - if let Some((peer_id, addr)) = &self.connected { - return Some((*peer_id, addr.clone())); - } - None - } - - /// Add an address for a given peer. We only store one address per peer. - pub fn add_address(&mut self, peer_id: PeerId, address: Multiaddr) { - self.address_of_peer.insert(peer_id, address); - } -} - -impl NetworkBehaviour for Behaviour { - type ProtocolsHandler = DummyProtocolsHandler; - type OutEvent = OutEvent; - - fn new_handler(&mut self) -> Self::ProtocolsHandler { - DummyProtocolsHandler::default() - } - - fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { - let mut addresses: Vec = vec![]; - - if let Some((counterparty_peer_id, addr)) = self.counterparty() { - if counterparty_peer_id == *peer_id { - addresses.push(addr) - } - } - - if let Some(addr) = self.address_of_peer.get(peer_id) { - addresses.push(addr.clone()); - } - - addresses - } - - fn inject_connected(&mut self, _: &PeerId) {} - - fn inject_disconnected(&mut self, _: &PeerId) {} - - fn inject_connection_established( - &mut self, - peer: &PeerId, - _: &ConnectionId, - point: &ConnectedPoint, - ) { - match point { - ConnectedPoint::Dialer { address } => { - self.connected = Some((*peer, address.clone())); - } - ConnectedPoint::Listener { - local_addr: _, - send_back_addr, - } => { - self.connected = Some((*peer, send_back_addr.clone())); - } - } - - self.events - .push_back(OutEvent::ConnectionEstablished(*peer)); - } - - fn inject_connection_closed(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) { - self.connected = None; - } - - fn inject_event(&mut self, _: PeerId, _: ConnectionId, _: void::Void) {} - - fn poll( - &mut self, - _: &mut Context<'_>, - _: &mut impl PollParameters, - ) -> Poll> { - if let Some(event) = self.events.pop_front() { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); - } - - Poll::Pending - } -} diff --git a/swap/src/protocol/alice/behaviour.rs b/swap/src/protocol/alice/behaviour.rs index c74e0356..58be2d50 100644 --- a/swap/src/protocol/alice/behaviour.rs +++ b/swap/src/protocol/alice/behaviour.rs @@ -1,6 +1,6 @@ use crate::env::Config; use crate::network::quote::BidQuote; -use crate::network::{encrypted_signature, peer_tracker, quote, spot_price, transfer_proof}; +use crate::network::{encrypted_signature, quote, spot_price, transfer_proof}; use crate::protocol::alice::{execution_setup, State0, State3}; use crate::{bitcoin, monero}; use anyhow::{anyhow, Error, Result}; @@ -11,7 +11,6 @@ use tracing::debug; #[derive(Debug)] pub enum OutEvent { - ConnectionEstablished(PeerId), SpotPriceRequested { request: spot_price::Request, channel: ResponseChannel, @@ -38,16 +37,6 @@ pub enum OutEvent { }, } -impl From for OutEvent { - fn from(event: peer_tracker::OutEvent) -> Self { - match event { - peer_tracker::OutEvent::ConnectionEstablished(id) => { - OutEvent::ConnectionEstablished(id) - } - } - } -} - impl OutEvent { fn unexpected_request(peer: PeerId) -> OutEvent { OutEvent::Failure { @@ -177,7 +166,6 @@ impl From for OutEvent { #[behaviour(out_event = "OutEvent", event_process = false)] #[allow(missing_debug_implementations)] pub struct Behaviour { - pt: peer_tracker::Behaviour, quote: quote::Behaviour, spot_price: spot_price::Behaviour, execution_setup: execution_setup::Behaviour, @@ -188,7 +176,6 @@ pub struct Behaviour { impl Default for Behaviour { fn default() -> Self { Self { - pt: Default::default(), quote: quote::alice(), spot_price: spot_price::alice(), execution_setup: Default::default(), diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index 136453a7..b95b91fb 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -83,9 +83,6 @@ where tokio::select! { swarm_event = self.swarm.next_event() => { match swarm_event { - SwarmEvent::Behaviour(OutEvent::ConnectionEstablished(alice)) => { - debug!("Connection Established with {}", alice); - } SwarmEvent::Behaviour(OutEvent::SpotPriceRequested { request, channel, peer }) => { let btc = request.btc; let xmr = match self.handle_spot_price_request(btc, self.monero_wallet.clone()).await { diff --git a/swap/src/protocol/bob.rs b/swap/src/protocol/bob.rs index 3e5a7e0c..604d879a 100644 --- a/swap/src/protocol/bob.rs +++ b/swap/src/protocol/bob.rs @@ -1,6 +1,6 @@ use crate::database::Database; use crate::env::Config; -use crate::network::{encrypted_signature, peer_tracker, spot_price}; +use crate::network::{encrypted_signature, spot_price}; use crate::protocol::bob; use crate::{bitcoin, monero}; use anyhow::{anyhow, Error, Result}; @@ -175,16 +175,6 @@ impl From for OutEvent { } } -impl From for OutEvent { - fn from(event: peer_tracker::OutEvent) -> Self { - match event { - peer_tracker::OutEvent::ConnectionEstablished(id) => { - OutEvent::ConnectionEstablished(id) - } - } - } -} - impl From for OutEvent { fn from(event: spot_price::OutEvent) -> Self { map_rr_event_to_outevent(event) @@ -244,7 +234,6 @@ impl From for OutEvent { #[behaviour(out_event = "OutEvent", event_process = false)] #[allow(missing_debug_implementations)] pub struct Behaviour { - pt: peer_tracker::Behaviour, quote: quote::Behaviour, spot_price: spot_price::Behaviour, execution_setup: execution_setup::Behaviour, @@ -255,7 +244,6 @@ pub struct Behaviour { impl Default for Behaviour { fn default() -> Self { Self { - pt: Default::default(), quote: quote::bob(), spot_price: spot_price::bob(), execution_setup: Default::default(), @@ -296,6 +284,9 @@ impl Behaviour { /// Add a known address for the given peer pub fn add_address(&mut self, peer_id: PeerId, address: Multiaddr) { - self.pt.add_address(peer_id, address) + self.quote.add_address(&peer_id, address.clone()); + self.spot_price.add_address(&peer_id, address.clone()); + self.transfer_proof.add_address(&peer_id, address.clone()); + self.encrypted_signature.add_address(&peer_id, address); } } diff --git a/swap/src/protocol/bob/event_loop.rs b/swap/src/protocol/bob/event_loop.rs index fc40805a..9f55bf2a 100644 --- a/swap/src/protocol/bob/event_loop.rs +++ b/swap/src/protocol/bob/event_loop.rs @@ -3,14 +3,13 @@ use crate::network::quote::BidQuote; use crate::network::{spot_price, transfer_proof}; use crate::protocol::bob::{Behaviour, OutEvent, State0, State2}; use crate::{bitcoin, monero}; -use anyhow::{anyhow, bail, Context, Result}; +use anyhow::{anyhow, Result}; use futures::FutureExt; use libp2p::swarm::SwarmEvent; use libp2p::{PeerId, Swarm}; -use std::convert::Infallible; use std::sync::Arc; use tokio::sync::mpsc::{Receiver, Sender}; -use tracing::{debug, error, trace}; +use tracing::{debug, error}; #[derive(Debug)] pub struct Channels { @@ -36,8 +35,6 @@ pub struct EventLoopHandle { start_execution_setup: Sender, done_execution_setup: Receiver>, recv_transfer_proof: Receiver, - conn_established: Receiver, - dial_alice: Sender<()>, send_encrypted_signature: Sender, request_spot_price: Sender, recv_spot_price: Receiver, @@ -62,19 +59,6 @@ impl EventLoopHandle { .ok_or_else(|| anyhow!("Failed to receive transfer proof from Alice")) } - /// Dials other party and wait for the connection to be established. - /// Do nothing if we are already connected - pub async fn dial(&mut self) -> Result<()> { - let _ = self.dial_alice.send(()).await?; - - self.conn_established - .recv() - .await - .ok_or_else(|| anyhow!("Failed to receive connection established from Alice"))?; - - Ok(()) - } - pub async fn request_spot_price(&mut self, btc: bitcoin::Amount) -> Result { let _ = self .request_spot_price @@ -122,7 +106,6 @@ pub struct EventLoop { start_execution_setup: Receiver, done_execution_setup: Sender>, recv_transfer_proof: Sender, - dial_alice: Receiver<()>, conn_established: Sender, send_encrypted_signature: Receiver, request_quote: Receiver<()>, @@ -138,7 +121,6 @@ impl EventLoop { let start_execution_setup = Channels::new(); let done_execution_setup = Channels::new(); let recv_transfer_proof = Channels::new(); - let dial_alice = Channels::new(); let conn_established = Channels::new(); let send_encrypted_signature = Channels::new(); let request_spot_price = Channels::new(); @@ -154,7 +136,6 @@ impl EventLoop { done_execution_setup: done_execution_setup.sender, recv_transfer_proof: recv_transfer_proof.sender, conn_established: conn_established.sender, - dial_alice: dial_alice.receiver, send_encrypted_signature: send_encrypted_signature.receiver, request_spot_price: request_spot_price.receiver, recv_spot_price: recv_spot_price.sender, @@ -166,8 +147,6 @@ impl EventLoop { start_execution_setup: start_execution_setup.sender, done_execution_setup: done_execution_setup.receiver, recv_transfer_proof: recv_transfer_proof.receiver, - conn_established: conn_established.receiver, - dial_alice: dial_alice.sender, send_encrypted_signature: send_encrypted_signature.sender, request_spot_price: request_spot_price.sender, recv_spot_price: recv_spot_price.receiver, @@ -178,7 +157,9 @@ impl EventLoop { Ok((event_loop, handle)) } - pub async fn run(mut self) -> Result { + pub async fn run(mut self) { + let _ = Swarm::dial(&mut self.swarm, &self.alice_peer_id); + loop { tokio::select! { swarm_event = self.swarm.next_event().fuse() => { @@ -188,10 +169,10 @@ impl EventLoop { } SwarmEvent::Behaviour(OutEvent::SpotPriceReceived(msg)) => { let _ = self.recv_spot_price.send(msg).await; - }, + } SwarmEvent::Behaviour(OutEvent::QuoteReceived(msg)) => { let _ = self.recv_quote.send(msg).await; - }, + } SwarmEvent::Behaviour(OutEvent::ExecutionSetupDone(res)) => { let _ = self.done_execution_setup.send(res.map(|state|*state)).await; } @@ -208,25 +189,42 @@ impl EventLoop { SwarmEvent::Behaviour(OutEvent::ResponseSent) => { } - SwarmEvent::Behaviour(OutEvent::CommunicationError(err)) => { - bail!(err.context("Communication error")) + SwarmEvent::Behaviour(OutEvent::CommunicationError(error)) => { + tracing::warn!("Communication error: {:#}", error); + return; + } + SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } if peer_id == self.alice_peer_id => { + tracing::debug!("Connected to Alice at {}", endpoint.get_remote_address()); + } + SwarmEvent::Dialing(peer_id) if peer_id == self.alice_peer_id => { + tracing::debug!("Dialling Alice at {}", peer_id); + } + SwarmEvent::ConnectionClosed { peer_id, endpoint, num_established, cause } if peer_id == self.alice_peer_id && num_established == 0 => { + match cause { + Some(error) => { + tracing::warn!("Lost connection to Alice at {}, cause: {}", endpoint.get_remote_address(), error); + }, + None => { + // no error means the disconnection was requested + tracing::info!("Successfully closed connection to Alice"); + return; + } + } + match libp2p::Swarm::dial(&mut self.swarm, &self.alice_peer_id) { + Ok(()) => {}, + Err(e) => { + tracing::warn!("Failed to re-dial Alice: {}", e); + return; + } + } + } + SwarmEvent::UnreachableAddr { peer_id, address, attempts_remaining, error } if peer_id == self.alice_peer_id && attempts_remaining == 0 => { + tracing::warn!("Failed to dial Alice at {}: {}", address, error); } _ => {} } }, - option = self.dial_alice.recv().fuse() => { - if option.is_some() { - let peer_id = self.alice_peer_id; - if self.swarm.pt.is_connected(&peer_id) { - trace!("Already connected to Alice at {}", peer_id); - let _ = self.conn_established.send(peer_id).await; - } else { - debug!("Dialing alice at {}", peer_id); - libp2p::Swarm::dial(&mut self.swarm, &peer_id).context("Failed to dial alice")?; - } - } - }, - spot_price_request = self.request_spot_price.recv().fuse() => { + spot_price_request = self.request_spot_price.recv().fuse() => { if let Some(request) = spot_price_request { self.swarm.request_spot_price(self.alice_peer_id, request); } diff --git a/swap/src/protocol/bob/swap.rs b/swap/src/protocol/bob/swap.rs index f13e5ffc..cb264750 100644 --- a/swap/src/protocol/bob/swap.rs +++ b/swap/src/protocol/bob/swap.rs @@ -69,8 +69,6 @@ async fn run_until_internal( BobState::Started { btc_amount } => { let bitcoin_refund_address = bitcoin_wallet.new_address().await?; - event_loop_handle.dial().await?; - let state2 = request_price_and_setup( btc_amount, &mut event_loop_handle, @@ -82,8 +80,6 @@ async fn run_until_internal( BobState::ExecutionSetupDone(state2) } BobState::ExecutionSetupDone(state2) => { - // Do not lock Bitcoin if not connected to Alice. - event_loop_handle.dial().await?; // Alice and Bob have exchanged info let (state3, tx_lock) = state2.lock_btc().await?; let signed_tx = bitcoin_wallet @@ -98,8 +94,6 @@ async fn run_until_internal( // Watch for Alice to Lock Xmr or for cancel timelock to elapse BobState::BtcLocked(state3) => { if let ExpiredTimelocks::None = state3.current_epoch(bitcoin_wallet.as_ref()).await? { - event_loop_handle.dial().await?; - let transfer_proof_watcher = event_loop_handle.recv_transfer_proof(); let cancel_timelock_expires = state3.wait_for_cancel_timelock_to_expire(bitcoin_wallet.as_ref()); @@ -140,8 +134,6 @@ async fn run_until_internal( monero_wallet_restore_blockheight, } => { if let ExpiredTimelocks::None = state.current_epoch(bitcoin_wallet.as_ref()).await? { - event_loop_handle.dial().await?; - let watch_request = state.lock_xmr_watch_request(lock_transfer_proof); select! { @@ -166,7 +158,6 @@ async fn run_until_internal( } BobState::XmrLocked(state) => { if let ExpiredTimelocks::None = state.expired_timelock(bitcoin_wallet.as_ref()).await? { - event_loop_handle.dial().await?; // Alice has locked Xmr // Bob sends Alice his key diff --git a/swap/tests/happy_path_restart_bob_before_comm.rs b/swap/tests/happy_path_restart_bob.rs similarity index 51% rename from swap/tests/happy_path_restart_bob_before_comm.rs rename to swap/tests/happy_path_restart_bob.rs index 6abba569..565eed60 100644 --- a/swap/tests/happy_path_restart_bob_before_comm.rs +++ b/swap/tests/happy_path_restart_bob.rs @@ -2,7 +2,7 @@ pub mod testutils; use swap::protocol::bob::BobState; use swap::protocol::{alice, bob}; -use testutils::bob_run_until::is_xmr_locked; +use testutils::bob_run_until::{is_btc_locked, is_xmr_locked}; use testutils::SlowCancelConfig; #[tokio::test] @@ -32,3 +32,31 @@ async fn given_bob_restarts_after_xmr_is_locked_resume_swap() { }) .await; } + +#[tokio::test] +async fn given_bob_restarts_before_xmr_is_locked_resume_swap() { + testutils::setup_test(SlowCancelConfig, |mut ctx| async move { + let (bob_swap, bob_join_handle) = ctx.bob_swap().await; + let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked)); + + let alice_swap = ctx.alice_next_swap().await; + let alice_swap = tokio::spawn(alice::run(alice_swap)); + + let bob_state = bob_swap.await??; + + assert!(matches!(bob_state, BobState::BtcLocked { .. })); + + let (bob_swap, _) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; + assert!(matches!(bob_swap.state, BobState::BtcLocked { .. })); + + let bob_state = bob::run(bob_swap).await?; + + ctx.assert_bob_redeemed(bob_state).await; + + let alice_state = alice_swap.await??; + ctx.assert_alice_redeemed(alice_state).await; + + Ok(()) + }) + .await; +} diff --git a/swap/tests/testutils/mod.rs b/swap/tests/testutils/mod.rs index 1235462e..fb442c2d 100644 --- a/swap/tests/testutils/mod.rs +++ b/swap/tests/testutils/mod.rs @@ -9,7 +9,6 @@ use get_port::get_port; use libp2p::core::Multiaddr; use libp2p::{PeerId, Swarm}; use monero_harness::{image, Monero}; -use std::convert::Infallible; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; @@ -78,7 +77,7 @@ impl BobParams { } } -pub struct BobEventLoopJoinHandle(JoinHandle>); +pub struct BobEventLoopJoinHandle(JoinHandle<()>); impl BobEventLoopJoinHandle { pub fn abort(&self) { From 638a169a040adced947ec2ba258b39ff8c9faadc Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 22 Mar 2021 16:28:59 +1100 Subject: [PATCH 10/11] Buffer transfer proof if we are not connected to Bob The request-response behaviour that is used for sending the transfer proof actually has a functionality for buffering a message if we are currently not connected. However, the request-response behaviour also emits a dial attempt and **drops** all buffered messages if this dial attempt fails. For us, the dial attempt will very likely always fail because Bob is very likely behind NAT and we have to wait for him to reconnect to us. To mitigate this, we build our own buffer within the EventLoop and send transfer proofs as soon as we are connected again. Resolves #348. --- .github/workflows/ci.yml | 3 +- swap/src/protocol/alice/behaviour.rs | 17 ++++++++-- swap/src/protocol/alice/event_loop.rs | 20 ++++++++++- ...appy_path_restart_bob_after_xmr_locked.rs} | 30 +--------------- ...appy_path_restart_bob_before_xmr_locked.rs | 34 +++++++++++++++++++ 5 files changed, 71 insertions(+), 33 deletions(-) rename swap/tests/{happy_path_restart_bob.rs => happy_path_restart_bob_after_xmr_locked.rs} (51%) create mode 100644 swap/tests/happy_path_restart_bob_before_xmr_locked.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 517febb5..edbbf5a3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -114,7 +114,8 @@ jobs: matrix: test_name: [ happy_path, - happy_path_restart_bob_before_comm, + happy_path_restart_bob_after_xmr_locked, + happy_path_restart_bob_before_xmr_locked, bob_refunds_using_cancel_and_refund_command, bob_refunds_using_cancel_and_refund_command_timelock_not_expired, bob_refunds_using_cancel_and_refund_command_timelock_not_expired_force, diff --git a/swap/src/protocol/alice/behaviour.rs b/swap/src/protocol/alice/behaviour.rs index 58be2d50..29465067 100644 --- a/swap/src/protocol/alice/behaviour.rs +++ b/swap/src/protocol/alice/behaviour.rs @@ -233,9 +233,22 @@ impl Behaviour { } /// Send Transfer Proof to Bob. - pub fn send_transfer_proof(&mut self, bob: PeerId, msg: transfer_proof::Request) { + /// + /// Fails and returns the transfer proof if we are currently not connected + /// to this peer. + pub fn send_transfer_proof( + &mut self, + bob: PeerId, + msg: transfer_proof::Request, + ) -> Result<(), transfer_proof::Request> { + if !self.transfer_proof.is_connected(&bob) { + return Err(msg); + } self.transfer_proof.send_request(&bob, msg); - debug!("Sent Transfer Proof"); + + debug!("Sending Transfer Proof"); + + Ok(()) } pub fn send_encrypted_signature_ack(&mut self, channel: ResponseChannel<()>) { diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index b95b91fb..24948741 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -38,6 +38,10 @@ pub struct EventLoop { FuturesUnordered>>, swap_sender: mpsc::Sender, + + /// Tracks [`transfer_proof::Request`]s which could not yet be sent because + /// we are currently disconnected from the peer. + buffered_transfer_proofs: HashMap, } impl EventLoop @@ -66,6 +70,7 @@ where max_buy, recv_encrypted_signature: Default::default(), send_transfer_proof: Default::default(), + buffered_transfer_proofs: Default::default(), }; Ok((event_loop, swap_channel.receiver)) } @@ -152,6 +157,14 @@ where } SwarmEvent::ConnectionEstablished { peer_id: peer, endpoint, .. } => { tracing::debug!(%peer, address = %endpoint.get_remote_address(), "New connection established"); + + if let Some(transfer_proof) = self.buffered_transfer_proofs.remove(&peer) { + tracing::debug!(%peer, "Found buffered transfer proof for peer"); + + self.swarm + .send_transfer_proof(peer, transfer_proof) + .expect("must be able to send transfer proof after connection was established"); + } } SwarmEvent::IncomingConnectionError { send_back_addr: address, error, .. } => { tracing::warn!(%address, "Failed to set up connection with peer: {}", error); @@ -172,7 +185,12 @@ where next_transfer_proof = self.send_transfer_proof.next() => { match next_transfer_proof { Some(Ok((peer, transfer_proof))) => { - self.swarm.send_transfer_proof(peer, transfer_proof); + let result = self.swarm.send_transfer_proof(peer, transfer_proof); + + if let Err(transfer_proof) = result { + tracing::warn!(%peer, "No active connection to peer, buffering transfer proof"); + self.buffered_transfer_proofs.insert(peer, transfer_proof); + } }, Some(Err(_)) => { tracing::debug!("A swap stopped without sending a transfer proof"); diff --git a/swap/tests/happy_path_restart_bob.rs b/swap/tests/happy_path_restart_bob_after_xmr_locked.rs similarity index 51% rename from swap/tests/happy_path_restart_bob.rs rename to swap/tests/happy_path_restart_bob_after_xmr_locked.rs index 565eed60..6abba569 100644 --- a/swap/tests/happy_path_restart_bob.rs +++ b/swap/tests/happy_path_restart_bob_after_xmr_locked.rs @@ -2,7 +2,7 @@ pub mod testutils; use swap::protocol::bob::BobState; use swap::protocol::{alice, bob}; -use testutils::bob_run_until::{is_btc_locked, is_xmr_locked}; +use testutils::bob_run_until::is_xmr_locked; use testutils::SlowCancelConfig; #[tokio::test] @@ -32,31 +32,3 @@ async fn given_bob_restarts_after_xmr_is_locked_resume_swap() { }) .await; } - -#[tokio::test] -async fn given_bob_restarts_before_xmr_is_locked_resume_swap() { - testutils::setup_test(SlowCancelConfig, |mut ctx| async move { - let (bob_swap, bob_join_handle) = ctx.bob_swap().await; - let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_btc_locked)); - - let alice_swap = ctx.alice_next_swap().await; - let alice_swap = tokio::spawn(alice::run(alice_swap)); - - let bob_state = bob_swap.await??; - - assert!(matches!(bob_state, BobState::BtcLocked { .. })); - - let (bob_swap, _) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; - assert!(matches!(bob_swap.state, BobState::BtcLocked { .. })); - - let bob_state = bob::run(bob_swap).await?; - - ctx.assert_bob_redeemed(bob_state).await; - - let alice_state = alice_swap.await??; - ctx.assert_alice_redeemed(alice_state).await; - - Ok(()) - }) - .await; -} diff --git a/swap/tests/happy_path_restart_bob_before_xmr_locked.rs b/swap/tests/happy_path_restart_bob_before_xmr_locked.rs new file mode 100644 index 00000000..6abba569 --- /dev/null +++ b/swap/tests/happy_path_restart_bob_before_xmr_locked.rs @@ -0,0 +1,34 @@ +pub mod testutils; + +use swap::protocol::bob::BobState; +use swap::protocol::{alice, bob}; +use testutils::bob_run_until::is_xmr_locked; +use testutils::SlowCancelConfig; + +#[tokio::test] +async fn given_bob_restarts_after_xmr_is_locked_resume_swap() { + testutils::setup_test(SlowCancelConfig, |mut ctx| async move { + let (bob_swap, bob_join_handle) = ctx.bob_swap().await; + let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_xmr_locked)); + + let alice_swap = ctx.alice_next_swap().await; + let alice_swap = tokio::spawn(alice::run(alice_swap)); + + let bob_state = bob_swap.await??; + + assert!(matches!(bob_state, BobState::XmrLocked { .. })); + + let (bob_swap, _) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; + assert!(matches!(bob_swap.state, BobState::XmrLocked { .. })); + + let bob_state = bob::run(bob_swap).await?; + + ctx.assert_bob_redeemed(bob_state).await; + + let alice_state = alice_swap.await??; + ctx.assert_alice_redeemed(alice_state).await; + + Ok(()) + }) + .await; +} From 1057d115d1d40f33d994a82ff70a1d2e87d8212e Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 23 Mar 2021 17:26:13 +1100 Subject: [PATCH 11/11] Re-order bob::event_loop based on importance --- swap/src/protocol/bob/event_loop.rs | 170 ++++++++++++++-------------- 1 file changed, 85 insertions(+), 85 deletions(-) diff --git a/swap/src/protocol/bob/event_loop.rs b/swap/src/protocol/bob/event_loop.rs index 9f55bf2a..97e49701 100644 --- a/swap/src/protocol/bob/event_loop.rs +++ b/swap/src/protocol/bob/event_loop.rs @@ -11,91 +11,6 @@ use std::sync::Arc; use tokio::sync::mpsc::{Receiver, Sender}; use tracing::{debug, error}; -#[derive(Debug)] -pub struct Channels { - sender: Sender, - receiver: Receiver, -} - -impl Channels { - pub fn new() -> Channels { - let (sender, receiver) = tokio::sync::mpsc::channel(100); - Channels { sender, receiver } - } -} - -impl Default for Channels { - fn default() -> Self { - Self::new() - } -} - -#[derive(Debug)] -pub struct EventLoopHandle { - start_execution_setup: Sender, - done_execution_setup: Receiver>, - recv_transfer_proof: Receiver, - send_encrypted_signature: Sender, - request_spot_price: Sender, - recv_spot_price: Receiver, - request_quote: Sender<()>, - recv_quote: Receiver, -} - -impl EventLoopHandle { - pub async fn execution_setup(&mut self, state0: State0) -> Result { - let _ = self.start_execution_setup.send(state0).await?; - - self.done_execution_setup - .recv() - .await - .ok_or_else(|| anyhow!("Failed to setup execution with Alice"))? - } - - pub async fn recv_transfer_proof(&mut self) -> Result { - self.recv_transfer_proof - .recv() - .await - .ok_or_else(|| anyhow!("Failed to receive transfer proof from Alice")) - } - - pub async fn request_spot_price(&mut self, btc: bitcoin::Amount) -> Result { - let _ = self - .request_spot_price - .send(spot_price::Request { btc }) - .await?; - - let response = self - .recv_spot_price - .recv() - .await - .ok_or_else(|| anyhow!("Failed to receive spot price from Alice"))?; - - Ok(response.xmr) - } - - pub async fn request_quote(&mut self) -> Result { - let _ = self.request_quote.send(()).await?; - - let quote = self - .recv_quote - .recv() - .await - .ok_or_else(|| anyhow!("Failed to receive quote from Alice"))?; - - Ok(quote) - } - - pub async fn send_encrypted_signature( - &mut self, - tx_redeem_encsig: EncryptedSignature, - ) -> Result<()> { - self.send_encrypted_signature.send(tx_redeem_encsig).await?; - - Ok(()) - } -} - #[allow(missing_debug_implementations)] pub struct EventLoop { swarm: libp2p::Swarm, @@ -250,3 +165,88 @@ impl EventLoop { } } } + +#[derive(Debug)] +pub struct EventLoopHandle { + start_execution_setup: Sender, + done_execution_setup: Receiver>, + recv_transfer_proof: Receiver, + send_encrypted_signature: Sender, + request_spot_price: Sender, + recv_spot_price: Receiver, + request_quote: Sender<()>, + recv_quote: Receiver, +} + +impl EventLoopHandle { + pub async fn execution_setup(&mut self, state0: State0) -> Result { + let _ = self.start_execution_setup.send(state0).await?; + + self.done_execution_setup + .recv() + .await + .ok_or_else(|| anyhow!("Failed to setup execution with Alice"))? + } + + pub async fn recv_transfer_proof(&mut self) -> Result { + self.recv_transfer_proof + .recv() + .await + .ok_or_else(|| anyhow!("Failed to receive transfer proof from Alice")) + } + + pub async fn request_spot_price(&mut self, btc: bitcoin::Amount) -> Result { + let _ = self + .request_spot_price + .send(spot_price::Request { btc }) + .await?; + + let response = self + .recv_spot_price + .recv() + .await + .ok_or_else(|| anyhow!("Failed to receive spot price from Alice"))?; + + Ok(response.xmr) + } + + pub async fn request_quote(&mut self) -> Result { + let _ = self.request_quote.send(()).await?; + + let quote = self + .recv_quote + .recv() + .await + .ok_or_else(|| anyhow!("Failed to receive quote from Alice"))?; + + Ok(quote) + } + + pub async fn send_encrypted_signature( + &mut self, + tx_redeem_encsig: EncryptedSignature, + ) -> Result<()> { + self.send_encrypted_signature.send(tx_redeem_encsig).await?; + + Ok(()) + } +} + +#[derive(Debug)] +struct Channels { + sender: Sender, + receiver: Receiver, +} + +impl Channels { + fn new() -> Channels { + let (sender, receiver) = tokio::sync::mpsc::channel(100); + Channels { sender, receiver } + } +} + +impl Default for Channels { + fn default() -> Self { + Self::new() + } +}