diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 517febb5..edbbf5a3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -114,7 +114,8 @@ jobs: matrix: test_name: [ happy_path, - happy_path_restart_bob_before_comm, + happy_path_restart_bob_after_xmr_locked, + happy_path_restart_bob_before_xmr_locked, bob_refunds_using_cancel_and_refund_command, bob_refunds_using_cancel_and_refund_command_timelock_not_expired, bob_refunds_using_cancel_and_refund_command_timelock_not_expired_force, diff --git a/bors.toml b/bors.toml index 9375744a..9a5198e7 100644 --- a/bors.toml +++ b/bors.toml @@ -7,7 +7,8 @@ status = [ "test (x86_64-unknown-linux-gnu, ubuntu-latest)", "test (x86_64-apple-darwin, macos-latest)", "docker_tests (happy_path)", - "docker_tests (happy_path_restart_bob_before_comm)", + "docker_tests (happy_path_restart_bob_after_xmr_locked)", + "docker_tests (happy_path_restart_bob_before_xmr_locked)", "docker_tests (bob_refunds_using_cancel_and_refund_command)", "docker_tests (bob_refunds_using_cancel_and_refund_command_timelock_not_expired_force)", "docker_tests (bob_refunds_using_cancel_and_refund_command_timelock_not_expired)", diff --git a/swap/src/bin/asb.rs b/swap/src/bin/asb.rs index b148ced7..b8a00da8 100644 --- a/swap/src/bin/asb.rs +++ b/swap/src/bin/asb.rs @@ -15,6 +15,7 @@ use anyhow::{Context, Result}; use bdk::descriptor::Segwitv0; use bdk::keys::DerivableKey; +use libp2p::Swarm; use prettytable::{row, Table}; use std::path::Path; use std::sync::Arc; @@ -27,7 +28,8 @@ use swap::database::Database; use swap::env::GetConfig; use swap::fs::default_config_path; use swap::monero::Amount; -use swap::protocol::alice::{run, EventLoop}; +use swap::network::swarm; +use swap::protocol::alice::{run, Behaviour, EventLoop}; use swap::seed::Seed; use swap::trace::init_tracing; use swap::{bitcoin, env, kraken, monero}; @@ -93,9 +95,12 @@ async fn main() -> Result<()> { let kraken_rate_updates = kraken::connect()?; + let mut swarm = swarm::new::(&seed)?; + Swarm::listen_on(&mut swarm, config.network.listen) + .context("Failed to listen network interface")?; + let (event_loop, mut swap_receiver) = EventLoop::new( - config.network.listen, - seed, + swarm, env_config, Arc::new(bitcoin_wallet), Arc::new(monero_wallet), diff --git a/swap/src/bin/swap.rs b/swap/src/bin/swap.rs index 7f05fa12..c1d6721f 100644 --- a/swap/src/bin/swap.rs +++ b/swap/src/bin/swap.rs @@ -25,8 +25,9 @@ use swap::cli::command::{AliceConnectParams, Arguments, Command, Data, MoneroPar use swap::database::Database; use swap::env::{Config, GetConfig}; use swap::network::quote::BidQuote; +use swap::network::swarm; use swap::protocol::bob; -use swap::protocol::bob::{Builder, EventLoop}; +use swap::protocol::bob::{Behaviour, Builder, EventLoop}; use swap::seed::Seed; use swap::{bitcoin, env, monero}; use tracing::{debug, error, info, warn, Level}; @@ -101,17 +102,17 @@ async fn main() -> Result<()> { } let bitcoin_wallet = - init_bitcoin_wallet(electrum_rpc_url, seed, data_dir.clone(), env_config).await?; + init_bitcoin_wallet(electrum_rpc_url, &seed, data_dir.clone(), env_config).await?; let (monero_wallet, _process) = init_monero_wallet(data_dir, monero_daemon_host, env_config).await?; let bitcoin_wallet = Arc::new(bitcoin_wallet); - let (event_loop, mut event_loop_handle) = EventLoop::new( - &seed.derive_libp2p_identity(), - alice_peer_id, - alice_addr, - bitcoin_wallet.clone(), - )?; - let handle = tokio::spawn(event_loop.run()); + + let mut swarm = swarm::new::(&seed)?; + swarm.add_address(alice_peer_id, alice_addr); + + let (event_loop, mut event_loop_handle) = + EventLoop::new(swarm, alice_peer_id, bitcoin_wallet.clone())?; + let event_loop = tokio::spawn(event_loop.run()); let send_bitcoin = determine_btc_to_swap( event_loop_handle.request_quote(), @@ -142,13 +143,13 @@ async fn main() -> Result<()> { .with_init_params(send_bitcoin) .build()?; - let swap = bob::run(swap); tokio::select! { - event_loop_result = handle => { - event_loop_result??; + result = event_loop => { + result + .context("EventLoop panicked")?; }, - swap_result = swap => { - swap_result?; + result = bob::run(swap) => { + result.context("Failed to complete swap")?; } } } @@ -183,17 +184,16 @@ async fn main() -> Result<()> { } let bitcoin_wallet = - init_bitcoin_wallet(electrum_rpc_url, seed, data_dir.clone(), env_config).await?; + init_bitcoin_wallet(electrum_rpc_url, &seed, data_dir.clone(), env_config).await?; let (monero_wallet, _process) = init_monero_wallet(data_dir, monero_daemon_host, env_config).await?; let bitcoin_wallet = Arc::new(bitcoin_wallet); - let (event_loop, event_loop_handle) = EventLoop::new( - &seed.derive_libp2p_identity(), - alice_peer_id, - alice_addr, - bitcoin_wallet.clone(), - )?; + let mut swarm = swarm::new::(&seed)?; + swarm.add_address(alice_peer_id, alice_addr); + + let (event_loop, event_loop_handle) = + EventLoop::new(swarm, alice_peer_id, bitcoin_wallet.clone())?; let handle = tokio::spawn(event_loop.run()); let swap = Builder::new( @@ -207,12 +207,11 @@ async fn main() -> Result<()> { ) .build()?; - let swap = bob::run(swap); tokio::select! { event_loop_result = handle => { - event_loop_result??; + event_loop_result?; }, - swap_result = swap => { + swap_result = bob::run(swap) => { swap_result?; } } @@ -223,7 +222,7 @@ async fn main() -> Result<()> { electrum_rpc_url, } => { let bitcoin_wallet = - init_bitcoin_wallet(electrum_rpc_url, seed, data_dir, env_config).await?; + init_bitcoin_wallet(electrum_rpc_url, &seed, data_dir, env_config).await?; let resume_state = db.get_state(swap_id)?.try_into_bob()?.into(); let cancel = @@ -248,7 +247,7 @@ async fn main() -> Result<()> { electrum_rpc_url, } => { let bitcoin_wallet = - init_bitcoin_wallet(electrum_rpc_url, seed, data_dir, env_config).await?; + init_bitcoin_wallet(electrum_rpc_url, &seed, data_dir, env_config).await?; let resume_state = db.get_state(swap_id)?.try_into_bob()?.into(); @@ -260,7 +259,7 @@ async fn main() -> Result<()> { async fn init_bitcoin_wallet( electrum_rpc_url: Url, - seed: Seed, + seed: &Seed, data_dir: PathBuf, env_config: Config, ) -> Result { @@ -314,7 +313,7 @@ async fn determine_btc_to_swap( ) -> Result { debug!("Requesting quote"); - let bid_quote = request_quote.await.context("Failed to request quote")?; + let bid_quote = request_quote.await?; info!("Received quote: 1 XMR ~ {}", bid_quote.price); diff --git a/swap/src/network.rs b/swap/src/network.rs index 7a82065b..4a3fb29e 100644 --- a/swap/src/network.rs +++ b/swap/src/network.rs @@ -1,21 +1,7 @@ -pub mod peer_tracker; +pub mod cbor_request_response; +pub mod encrypted_signature; pub mod quote; -pub mod request_response; pub mod spot_price; +pub mod swarm; +pub mod transfer_proof; pub mod transport; - -use libp2p::core::Executor; -use std::future::Future; -use std::pin::Pin; -use tokio::runtime::Handle; - -#[allow(missing_debug_implementations)] -pub struct TokioExecutor { - pub handle: Handle, -} - -impl Executor for TokioExecutor { - fn exec(&self, future: Pin + Send>>) { - let _ = self.handle.spawn(future); - } -} diff --git a/swap/src/network/request_response.rs b/swap/src/network/cbor_request_response.rs similarity index 84% rename from swap/src/network/request_response.rs rename to swap/src/network/cbor_request_response.rs index 5e39e347..055224c8 100644 --- a/swap/src/network/request_response.rs +++ b/swap/src/network/cbor_request_response.rs @@ -9,30 +9,9 @@ use std::fmt::Debug; use std::io; use std::marker::PhantomData; -/// Time to wait for a response back once we send a request. -pub const TIMEOUT: u64 = 3600; // One hour. - /// Message receive buffer. pub const BUF_SIZE: usize = 1024 * 1024; -#[derive(Debug, Clone, Copy, Default)] -pub struct TransferProofProtocol; - -#[derive(Debug, Clone, Copy, Default)] -pub struct EncryptedSignatureProtocol; - -impl ProtocolName for TransferProofProtocol { - fn protocol_name(&self) -> &[u8] { - b"/comit/xmr/btc/transfer_proof/1.0.0" - } -} - -impl ProtocolName for EncryptedSignatureProtocol { - fn protocol_name(&self) -> &[u8] { - b"/comit/xmr/btc/encrypted_signature/1.0.0" - } -} - #[derive(Clone, Copy, Debug)] pub struct CborCodec { phantom: PhantomData<(P, Req, Res)>, diff --git a/swap/src/network/encrypted_signature.rs b/swap/src/network/encrypted_signature.rs new file mode 100644 index 00000000..1f5d94f9 --- /dev/null +++ b/swap/src/network/encrypted_signature.rs @@ -0,0 +1,43 @@ +use crate::network::cbor_request_response::CborCodec; +use libp2p::core::ProtocolName; +use libp2p::request_response::{ + ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent, + RequestResponseMessage, +}; +use serde::{Deserialize, Serialize}; + +pub type OutEvent = RequestResponseEvent; + +#[derive(Debug, Clone, Copy, Default)] +pub struct EncryptedSignatureProtocol; + +impl ProtocolName for EncryptedSignatureProtocol { + fn protocol_name(&self) -> &[u8] { + b"/comit/xmr/btc/encrypted_signature/1.0.0" + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Request { + pub tx_redeem_encsig: crate::bitcoin::EncryptedSignature, +} + +pub type Behaviour = RequestResponse>; + +pub type Message = RequestResponseMessage; + +pub fn alice() -> Behaviour { + Behaviour::new( + CborCodec::default(), + vec![(EncryptedSignatureProtocol, ProtocolSupport::Inbound)], + RequestResponseConfig::default(), + ) +} + +pub fn bob() -> Behaviour { + Behaviour::new( + CborCodec::default(), + vec![(EncryptedSignatureProtocol, ProtocolSupport::Outbound)], + RequestResponseConfig::default(), + ) +} diff --git a/swap/src/network/peer_tracker.rs b/swap/src/network/peer_tracker.rs deleted file mode 100644 index 36a536ce..00000000 --- a/swap/src/network/peer_tracker.rs +++ /dev/null @@ -1,126 +0,0 @@ -use futures::task::Context; -use libp2p::core::connection::ConnectionId; -use libp2p::core::ConnectedPoint; -use libp2p::swarm::protocols_handler::DummyProtocolsHandler; -use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters}; -use libp2p::{Multiaddr, PeerId}; -use std::collections::{HashMap, VecDeque}; -use std::task::Poll; - -#[derive(Debug, Copy, Clone)] -pub enum OutEvent { - ConnectionEstablished(PeerId), -} - -/// A NetworkBehaviour that tracks connections to the counterparty. Although the -/// libp2p `NetworkBehaviour` abstraction encompasses connections to multiple -/// peers we only ever connect to a single counterparty. Peer Tracker tracks -/// that connection. -#[derive(Default, Debug)] -pub struct Behaviour { - connected: Option<(PeerId, Multiaddr)>, - address_of_peer: HashMap, - events: VecDeque, -} - -impl Behaviour { - /// Return whether we are connected to the given peer. - pub fn is_connected(&self, peer_id: &PeerId) -> bool { - if let Some((connected_peer_id, _)) = &self.connected { - if connected_peer_id == peer_id { - return true; - } - } - false - } - - /// Returns the peer id of counterparty if we are connected. - pub fn counterparty_peer_id(&self) -> Option { - if let Some((id, _)) = &self.connected { - return Some(*id); - } - None - } - - /// Returns the peer_id and multiaddr of counterparty if we are connected. - pub fn counterparty(&self) -> Option<(PeerId, Multiaddr)> { - if let Some((peer_id, addr)) = &self.connected { - return Some((*peer_id, addr.clone())); - } - None - } - - /// Add an address for a given peer. We only store one address per peer. - pub fn add_address(&mut self, peer_id: PeerId, address: Multiaddr) { - self.address_of_peer.insert(peer_id, address); - } -} - -impl NetworkBehaviour for Behaviour { - type ProtocolsHandler = DummyProtocolsHandler; - type OutEvent = OutEvent; - - fn new_handler(&mut self) -> Self::ProtocolsHandler { - DummyProtocolsHandler::default() - } - - fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { - let mut addresses: Vec = vec![]; - - if let Some((counterparty_peer_id, addr)) = self.counterparty() { - if counterparty_peer_id == *peer_id { - addresses.push(addr) - } - } - - if let Some(addr) = self.address_of_peer.get(peer_id) { - addresses.push(addr.clone()); - } - - addresses - } - - fn inject_connected(&mut self, _: &PeerId) {} - - fn inject_disconnected(&mut self, _: &PeerId) {} - - fn inject_connection_established( - &mut self, - peer: &PeerId, - _: &ConnectionId, - point: &ConnectedPoint, - ) { - match point { - ConnectedPoint::Dialer { address } => { - self.connected = Some((*peer, address.clone())); - } - ConnectedPoint::Listener { - local_addr: _, - send_back_addr, - } => { - self.connected = Some((*peer, send_back_addr.clone())); - } - } - - self.events - .push_back(OutEvent::ConnectionEstablished(*peer)); - } - - fn inject_connection_closed(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) { - self.connected = None; - } - - fn inject_event(&mut self, _: PeerId, _: ConnectionId, _: void::Void) {} - - fn poll( - &mut self, - _: &mut Context<'_>, - _: &mut impl PollParameters, - ) -> Poll> { - if let Some(event) = self.events.pop_front() { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); - } - - Poll::Pending - } -} diff --git a/swap/src/network/quote.rs b/swap/src/network/quote.rs index 4e536a0b..9c649d2c 100644 --- a/swap/src/network/quote.rs +++ b/swap/src/network/quote.rs @@ -1,8 +1,9 @@ use crate::bitcoin; -use crate::network::request_response::CborCodec; +use crate::network::cbor_request_response::CborCodec; use libp2p::core::ProtocolName; use libp2p::request_response::{ ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent, + RequestResponseMessage, }; use serde::{Deserialize, Serialize}; @@ -17,6 +18,8 @@ impl ProtocolName for BidQuoteProtocol { } } +pub type Message = RequestResponseMessage<(), BidQuote>; + /// Represents a quote for buying XMR. #[derive(Serialize, Deserialize, Debug, Clone)] pub struct BidQuote { diff --git a/swap/src/network/spot_price.rs b/swap/src/network/spot_price.rs index 3fcd9373..2a57db09 100644 --- a/swap/src/network/spot_price.rs +++ b/swap/src/network/spot_price.rs @@ -1,8 +1,9 @@ -use crate::network::request_response::CborCodec; +use crate::network::cbor_request_response::CborCodec; use crate::{bitcoin, monero}; use libp2p::core::ProtocolName; use libp2p::request_response::{ ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent, + RequestResponseMessage, }; use serde::{Deserialize, Serialize}; @@ -39,6 +40,8 @@ pub struct Response { pub type Behaviour = RequestResponse>; +pub type Message = RequestResponseMessage; + /// Constructs a new instance of the `spot-price` behaviour to be used by Alice. /// /// Alice only supports inbound connections, i.e. providing spot prices for BTC diff --git a/swap/src/network/swarm.rs b/swap/src/network/swarm.rs new file mode 100644 index 00000000..40e9f6d4 --- /dev/null +++ b/swap/src/network/swarm.rs @@ -0,0 +1,23 @@ +use crate::network::transport; +use crate::seed::Seed; +use anyhow::Result; +use libp2p::swarm::{NetworkBehaviour, SwarmBuilder}; +use libp2p::Swarm; + +pub fn new(seed: &Seed) -> Result> +where + B: NetworkBehaviour + Default, +{ + let identity = seed.derive_libp2p_identity(); + + let behaviour = B::default(); + let transport = transport::build(&identity)?; + + let swarm = SwarmBuilder::new(transport, behaviour, identity.public().into_peer_id()) + .executor(Box::new(|f| { + tokio::spawn(f); + })) + .build(); + + Ok(swarm) +} diff --git a/swap/src/network/transfer_proof.rs b/swap/src/network/transfer_proof.rs new file mode 100644 index 00000000..3cf83adb --- /dev/null +++ b/swap/src/network/transfer_proof.rs @@ -0,0 +1,44 @@ +use crate::monero; +use crate::network::cbor_request_response::CborCodec; +use libp2p::core::ProtocolName; +use libp2p::request_response::{ + ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent, + RequestResponseMessage, +}; +use serde::{Deserialize, Serialize}; + +pub type OutEvent = RequestResponseEvent; + +#[derive(Debug, Clone, Copy, Default)] +pub struct TransferProofProtocol; + +impl ProtocolName for TransferProofProtocol { + fn protocol_name(&self) -> &[u8] { + b"/comit/xmr/btc/transfer_proof/1.0.0" + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Request { + pub tx_lock_proof: monero::TransferProof, +} + +pub type Behaviour = RequestResponse>; + +pub type Message = RequestResponseMessage; + +pub fn alice() -> Behaviour { + Behaviour::new( + CborCodec::default(), + vec![(TransferProofProtocol, ProtocolSupport::Outbound)], + RequestResponseConfig::default(), + ) +} + +pub fn bob() -> Behaviour { + Behaviour::new( + CborCodec::default(), + vec![(TransferProofProtocol, ProtocolSupport::Inbound)], + RequestResponseConfig::default(), + ) +} diff --git a/swap/src/protocol/alice.rs b/swap/src/protocol/alice.rs index 28ef4bce..9bc6423d 100644 --- a/swap/src/protocol/alice.rs +++ b/swap/src/protocol/alice.rs @@ -11,16 +11,13 @@ pub use self::event_loop::{EventLoop, EventLoopHandle}; pub use self::execution_setup::Message1; pub use self::state::*; pub use self::swap::{run, run_until}; -pub use self::transfer_proof::TransferProof; pub use execution_setup::Message3; mod behaviour; -mod encrypted_signature; pub mod event_loop; mod execution_setup; pub mod state; pub mod swap; -mod transfer_proof; pub struct Swap { pub state: AliceState, diff --git a/swap/src/protocol/alice/behaviour.rs b/swap/src/protocol/alice/behaviour.rs index a7825678..29465067 100644 --- a/swap/src/protocol/alice/behaviour.rs +++ b/swap/src/protocol/alice/behaviour.rs @@ -1,22 +1,18 @@ use crate::env::Config; use crate::network::quote::BidQuote; -use crate::network::{peer_tracker, quote, spot_price}; -use crate::protocol::alice::{ - encrypted_signature, execution_setup, transfer_proof, State0, State3, TransferProof, -}; -use crate::protocol::bob::EncryptedSignature; +use crate::network::{encrypted_signature, quote, spot_price, transfer_proof}; +use crate::protocol::alice::{execution_setup, State0, State3}; use crate::{bitcoin, monero}; use anyhow::{anyhow, Error, Result}; -use libp2p::request_response::{RequestResponseMessage, ResponseChannel}; +use libp2p::request_response::{RequestResponseEvent, RequestResponseMessage, ResponseChannel}; use libp2p::{NetworkBehaviour, PeerId}; use rand::{CryptoRng, RngCore}; use tracing::debug; #[derive(Debug)] pub enum OutEvent { - ConnectionEstablished(PeerId), SpotPriceRequested { - msg: spot_price::Request, + request: spot_price::Request, channel: ResponseChannel, peer: PeerId, }, @@ -29,8 +25,8 @@ pub enum OutEvent { state3: Box, }, TransferProofAcknowledged(PeerId), - EncryptedSignature { - msg: Box, + EncryptedSignatureReceived { + msg: Box, channel: ResponseChannel<()>, peer: PeerId, }, @@ -41,72 +37,111 @@ pub enum OutEvent { }, } -impl From for OutEvent { - fn from(event: peer_tracker::OutEvent) -> Self { - match event { - peer_tracker::OutEvent::ConnectionEstablished(id) => { - OutEvent::ConnectionEstablished(id) - } +impl OutEvent { + fn unexpected_request(peer: PeerId) -> OutEvent { + OutEvent::Failure { + peer, + error: anyhow!("Unexpected request received"), + } + } + + fn unexpected_response(peer: PeerId) -> OutEvent { + OutEvent::Failure { + peer, + error: anyhow!("Unexpected response received"), + } + } +} + +impl From<(PeerId, quote::Message)> for OutEvent { + fn from((peer, message): (PeerId, quote::Message)) -> Self { + match message { + quote::Message::Request { channel, .. } => OutEvent::QuoteRequested { channel, peer }, + quote::Message::Response { .. } => OutEvent::unexpected_response(peer), + } + } +} + +impl From<(PeerId, spot_price::Message)> for OutEvent { + fn from((peer, message): (PeerId, spot_price::Message)) -> Self { + match message { + spot_price::Message::Request { + request, channel, .. + } => OutEvent::SpotPriceRequested { + request, + channel, + peer, + }, + spot_price::Message::Response { .. } => OutEvent::unexpected_response(peer), + } + } +} + +impl From<(PeerId, transfer_proof::Message)> for OutEvent { + fn from((peer, message): (PeerId, transfer_proof::Message)) -> Self { + match message { + transfer_proof::Message::Request { .. } => OutEvent::unexpected_request(peer), + transfer_proof::Message::Response { .. } => OutEvent::TransferProofAcknowledged(peer), + } + } +} + +impl From<(PeerId, encrypted_signature::Message)> for OutEvent { + fn from((peer, message): (PeerId, encrypted_signature::Message)) -> Self { + match message { + encrypted_signature::Message::Request { + request, channel, .. + } => OutEvent::EncryptedSignatureReceived { + msg: Box::new(request), + channel, + peer, + }, + encrypted_signature::Message::Response { .. } => OutEvent::unexpected_response(peer), } } } impl From for OutEvent { fn from(event: spot_price::OutEvent) -> Self { - match event { - spot_price::OutEvent::Message { - peer, - message: - RequestResponseMessage::Request { - channel, - request: msg, - .. - }, - } => OutEvent::SpotPriceRequested { msg, channel, peer }, - spot_price::OutEvent::Message { - message: RequestResponseMessage::Response { .. }, - peer, - } => OutEvent::Failure { - error: anyhow!("Alice is only meant to hand out spot prices, not receive them"), - peer, - }, - spot_price::OutEvent::ResponseSent { .. } => OutEvent::ResponseSent, - spot_price::OutEvent::InboundFailure { peer, error, .. } => OutEvent::Failure { - error: anyhow!("spot_price protocol failed due to {:?}", error), - peer, - }, - spot_price::OutEvent::OutboundFailure { peer, error, .. } => OutEvent::Failure { - error: anyhow!("spot_price protocol failed due to {:?}", error), - peer, - }, - } + map_rr_event_to_outevent(event) } } impl From for OutEvent { fn from(event: quote::OutEvent) -> Self { - match event { - quote::OutEvent::Message { - peer, - message: RequestResponseMessage::Request { channel, .. }, - } => OutEvent::QuoteRequested { channel, peer }, - quote::OutEvent::Message { - message: RequestResponseMessage::Response { .. }, - peer, - } => OutEvent::Failure { - error: anyhow!("Alice is only meant to hand out quotes, not receive them"), - peer, - }, - quote::OutEvent::ResponseSent { .. } => OutEvent::ResponseSent, - quote::OutEvent::InboundFailure { peer, error, .. } => OutEvent::Failure { - error: anyhow!("quote protocol failed due to {:?}", error), - peer, - }, - quote::OutEvent::OutboundFailure { peer, error, .. } => OutEvent::Failure { - error: anyhow!("quote protocol failed due to {:?}", error), - peer, - }, - } + map_rr_event_to_outevent(event) + } +} + +impl From for OutEvent { + fn from(event: transfer_proof::OutEvent) -> Self { + map_rr_event_to_outevent(event) + } +} + +impl From for OutEvent { + fn from(event: encrypted_signature::OutEvent) -> Self { + map_rr_event_to_outevent(event) + } +} + +fn map_rr_event_to_outevent(event: RequestResponseEvent) -> OutEvent +where + OutEvent: From<(PeerId, RequestResponseMessage)>, +{ + use RequestResponseEvent::*; + + match event { + Message { message, peer, .. } => OutEvent::from((peer, message)), + ResponseSent { .. } => OutEvent::ResponseSent, + InboundFailure { peer, error, .. } => OutEvent::Failure { + error: anyhow!("protocol failed due to {:?}", error), + peer, + }, + OutboundFailure { peer, error, .. } => OutEvent::Failure { + error: anyhow!("protocol failed due to {:?}", error), + peer, + }, } } @@ -126,43 +161,11 @@ impl From for OutEvent { } } -impl From for OutEvent { - fn from(event: transfer_proof::OutEvent) -> Self { - use crate::protocol::alice::transfer_proof::OutEvent::*; - match event { - Acknowledged(peer) => OutEvent::TransferProofAcknowledged(peer), - Failure { peer, error } => OutEvent::Failure { - peer, - error: error.context("Failure with Transfer Proof"), - }, - } - } -} - -impl From for OutEvent { - fn from(event: encrypted_signature::OutEvent) -> Self { - use crate::protocol::alice::encrypted_signature::OutEvent::*; - match event { - MsgReceived { msg, channel, peer } => OutEvent::EncryptedSignature { - msg: Box::new(msg), - channel, - peer, - }, - AckSent => OutEvent::ResponseSent, - Failure { peer, error } => OutEvent::Failure { - peer, - error: error.context("Failure with Encrypted Signature"), - }, - } - } -} - /// A `NetworkBehaviour` that represents an XMR/BTC swap node as Alice. #[derive(NetworkBehaviour)] #[behaviour(out_event = "OutEvent", event_process = false)] #[allow(missing_debug_implementations)] pub struct Behaviour { - pt: peer_tracker::Behaviour, quote: quote::Behaviour, spot_price: spot_price::Behaviour, execution_setup: execution_setup::Behaviour, @@ -173,12 +176,11 @@ pub struct Behaviour { impl Default for Behaviour { fn default() -> Self { Self { - pt: Default::default(), quote: quote::alice(), spot_price: spot_price::alice(), execution_setup: Default::default(), - transfer_proof: Default::default(), - encrypted_signature: Default::default(), + transfer_proof: transfer_proof::alice(), + encrypted_signature: encrypted_signature::alice(), } } } @@ -231,12 +233,25 @@ impl Behaviour { } /// Send Transfer Proof to Bob. - pub fn send_transfer_proof(&mut self, bob: PeerId, msg: TransferProof) { - self.transfer_proof.send(bob, msg); - debug!("Sent Transfer Proof"); + /// + /// Fails and returns the transfer proof if we are currently not connected + /// to this peer. + pub fn send_transfer_proof( + &mut self, + bob: PeerId, + msg: transfer_proof::Request, + ) -> Result<(), transfer_proof::Request> { + if !self.transfer_proof.is_connected(&bob) { + return Err(msg); + } + self.transfer_proof.send_request(&bob, msg); + + debug!("Sending Transfer Proof"); + + Ok(()) } - pub fn send_encrypted_signature_ack(&mut self, channel: ResponseChannel<()>) -> Result<()> { - self.encrypted_signature.send_ack(channel) + pub fn send_encrypted_signature_ack(&mut self, channel: ResponseChannel<()>) { + let _ = self.encrypted_signature.send_response(channel, ()); } } diff --git a/swap/src/protocol/alice/encrypted_signature.rs b/swap/src/protocol/alice/encrypted_signature.rs deleted file mode 100644 index 1cf1f14c..00000000 --- a/swap/src/protocol/alice/encrypted_signature.rs +++ /dev/null @@ -1,95 +0,0 @@ -use crate::network::request_response::{CborCodec, EncryptedSignatureProtocol, TIMEOUT}; -use crate::protocol::bob::EncryptedSignature; -use anyhow::{anyhow, Error, Result}; -use libp2p::request_response::{ - ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent, - RequestResponseMessage, ResponseChannel, -}; -use libp2p::{NetworkBehaviour, PeerId}; -use std::time::Duration; -use tracing::debug; - -#[derive(Debug)] -pub enum OutEvent { - MsgReceived { - msg: EncryptedSignature, - channel: ResponseChannel<()>, - peer: PeerId, - }, - AckSent, - Failure { - peer: PeerId, - error: Error, - }, -} - -/// A `NetworkBehaviour` that represents receiving the Bitcoin encrypted -/// signature from Bob. -#[derive(NetworkBehaviour)] -#[behaviour(out_event = "OutEvent", event_process = false)] -#[allow(missing_debug_implementations)] -pub struct Behaviour { - rr: RequestResponse>, -} - -impl Behaviour { - pub fn send_ack(&mut self, channel: ResponseChannel<()>) -> Result<()> { - self.rr - .send_response(channel, ()) - .map_err(|err| anyhow!("Failed to ack encrypted signature: {:?}", err)) - } -} - -impl Default for Behaviour { - fn default() -> Self { - let timeout = Duration::from_secs(TIMEOUT); - let mut config = RequestResponseConfig::default(); - config.set_request_timeout(timeout); - - Self { - rr: RequestResponse::new( - CborCodec::default(), - vec![(EncryptedSignatureProtocol, ProtocolSupport::Inbound)], - config, - ), - } - } -} - -impl From> for OutEvent { - fn from(event: RequestResponseEvent) -> Self { - match event { - RequestResponseEvent::Message { - peer, - message: - RequestResponseMessage::Request { - request, channel, .. - }, - .. - } => { - debug!("Received encrypted signature from {}", peer); - OutEvent::MsgReceived { - msg: request, - channel, - peer, - } - } - RequestResponseEvent::Message { - message: RequestResponseMessage::Response { .. }, - peer, - } => OutEvent::Failure { - peer, - error: anyhow!("Alice should not get a Response"), - }, - RequestResponseEvent::InboundFailure { error, peer, .. } => OutEvent::Failure { - peer, - error: anyhow!("Inbound failure: {:?}", error), - }, - RequestResponseEvent::OutboundFailure { error, peer, .. } => OutEvent::Failure { - peer, - error: anyhow!("Outbound failure: {:?}", error), - }, - RequestResponseEvent::ResponseSent { .. } => OutEvent::AckSent, - } - } -} diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index c04dbdb8..24948741 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -3,16 +3,14 @@ use crate::database::Database; use crate::env::Config; use crate::monero::BalanceTooLow; use crate::network::quote::BidQuote; -use crate::network::{spot_price, transport, TokioExecutor}; -use crate::protocol::alice::{AliceState, Behaviour, OutEvent, State3, Swap, TransferProof}; -use crate::protocol::bob::EncryptedSignature; -use crate::seed::Seed; +use crate::network::{encrypted_signature, spot_price, transfer_proof}; +use crate::protocol::alice::{AliceState, Behaviour, OutEvent, State3, Swap}; use crate::{bitcoin, kraken, monero}; use anyhow::{bail, Context, Result}; use futures::future; use futures::future::{BoxFuture, FutureExt}; use futures::stream::{FuturesUnordered, StreamExt}; -use libp2p::core::Multiaddr; +use libp2p::swarm::SwarmEvent; use libp2p::{PeerId, Swarm}; use rand::rngs::OsRng; use std::collections::HashMap; @@ -25,7 +23,6 @@ use uuid::Uuid; #[allow(missing_debug_implementations)] pub struct EventLoop { swarm: libp2p::Swarm, - peer_id: PeerId, env_config: Config, bitcoin_wallet: Arc, monero_wallet: Arc, @@ -34,22 +31,25 @@ pub struct EventLoop { max_buy: bitcoin::Amount, /// Stores a sender per peer for incoming [`EncryptedSignature`]s. - recv_encrypted_signature: HashMap>, + recv_encrypted_signature: HashMap>, /// Stores a list of futures, waiting for transfer proof which will be sent /// to the given peer. - send_transfer_proof: FuturesUnordered>>, + send_transfer_proof: + FuturesUnordered>>, swap_sender: mpsc::Sender, + + /// Tracks [`transfer_proof::Request`]s which could not yet be sent because + /// we are currently disconnected from the peer. + buffered_transfer_proofs: HashMap, } impl EventLoop where LR: LatestRate, { - #[allow(clippy::too_many_arguments)] pub fn new( - listen_address: Multiaddr, - seed: Seed, + swarm: Swarm, env_config: Config, bitcoin_wallet: Arc, monero_wallet: Arc, @@ -57,25 +57,10 @@ where latest_rate: LR, max_buy: bitcoin::Amount, ) -> Result<(Self, mpsc::Receiver)> { - let identity = seed.derive_libp2p_identity(); - let behaviour = Behaviour::default(); - let transport = transport::build(&identity)?; - let peer_id = PeerId::from(identity.public()); - - let mut swarm = libp2p::swarm::SwarmBuilder::new(transport, behaviour, peer_id) - .executor(Box::new(TokioExecutor { - handle: tokio::runtime::Handle::current(), - })) - .build(); - - Swarm::listen_on(&mut swarm, listen_address.clone()) - .with_context(|| format!("Address is not supported: {:#}", listen_address))?; - let swap_channel = MpscChannels::default(); let event_loop = EventLoop { swarm, - peer_id, env_config, bitcoin_wallet, monero_wallet, @@ -85,12 +70,13 @@ where max_buy, recv_encrypted_signature: Default::default(), send_transfer_proof: Default::default(), + buffered_transfer_proofs: Default::default(), }; Ok((event_loop, swap_channel.receiver)) } pub fn peer_id(&self) -> PeerId { - self.peer_id + *Swarm::local_peer_id(&self.swarm) } pub async fn run(mut self) { @@ -100,13 +86,10 @@ where loop { tokio::select! { - swarm_event = self.swarm.next() => { + swarm_event = self.swarm.next_event() => { match swarm_event { - OutEvent::ConnectionEstablished(alice) => { - debug!("Connection Established with {}", alice); - } - OutEvent::SpotPriceRequested { msg, channel, peer } => { - let btc = msg.btc; + SwarmEvent::Behaviour(OutEvent::SpotPriceRequested { request, channel, peer }) => { + let btc = request.btc; let xmr = match self.handle_spot_price_request(btc, self.monero_wallet.clone()).await { Ok(xmr) => xmr, Err(e) => { @@ -131,7 +114,7 @@ where } } } - OutEvent::QuoteRequested { channel, peer } => { + SwarmEvent::Behaviour(OutEvent::QuoteRequested { channel, peer }) => { let quote = match self.make_quote(self.max_buy).await { Ok(quote) => quote, Err(e) => { @@ -149,13 +132,13 @@ where } } } - OutEvent::ExecutionSetupDone{bob_peer_id, state3} => { + SwarmEvent::Behaviour(OutEvent::ExecutionSetupDone{bob_peer_id, state3}) => { let _ = self.handle_execution_setup_done(bob_peer_id, *state3).await; } - OutEvent::TransferProofAcknowledged(peer) => { + SwarmEvent::Behaviour(OutEvent::TransferProofAcknowledged(peer)) => { trace!(%peer, "Bob acknowledged transfer proof"); } - OutEvent::EncryptedSignature{ msg, channel, peer } => { + SwarmEvent::Behaviour(OutEvent::EncryptedSignatureReceived{ msg, channel, peer }) => { match self.recv_encrypted_signature.remove(&peer) { Some(sender) => { // this failing just means the receiver is no longer interested ... @@ -166,20 +149,48 @@ where } } - if let Err(error) = self.swarm.send_encrypted_signature_ack(channel) { - error!("Failed to send Encrypted Signature ack: {:?}", error); - } + self.swarm.send_encrypted_signature_ack(channel); } - OutEvent::ResponseSent => {} - OutEvent::Failure {peer, error} => { + SwarmEvent::Behaviour(OutEvent::ResponseSent) => {} + SwarmEvent::Behaviour(OutEvent::Failure {peer, error}) => { error!(%peer, "Communication error: {:#}", error); } + SwarmEvent::ConnectionEstablished { peer_id: peer, endpoint, .. } => { + tracing::debug!(%peer, address = %endpoint.get_remote_address(), "New connection established"); + + if let Some(transfer_proof) = self.buffered_transfer_proofs.remove(&peer) { + tracing::debug!(%peer, "Found buffered transfer proof for peer"); + + self.swarm + .send_transfer_proof(peer, transfer_proof) + .expect("must be able to send transfer proof after connection was established"); + } + } + SwarmEvent::IncomingConnectionError { send_back_addr: address, error, .. } => { + tracing::warn!(%address, "Failed to set up connection with peer: {}", error); + } + SwarmEvent::ConnectionClosed { peer_id: peer, num_established, endpoint, cause } if num_established == 0 => { + match cause { + Some(error) => { + tracing::warn!(%peer, address = %endpoint.get_remote_address(), "Lost connection: {}", error); + }, + None => { + tracing::info!(%peer, address = %endpoint.get_remote_address(), "Successfully closed connection"); + } + } + } + _ => {} } }, next_transfer_proof = self.send_transfer_proof.next() => { match next_transfer_proof { Some(Ok((peer, transfer_proof))) => { - self.swarm.send_transfer_proof(peer, transfer_proof); + let result = self.swarm.send_transfer_proof(peer, transfer_proof); + + if let Err(transfer_proof) = result { + tracing::warn!(%peer, "No active connection to peer, buffering transfer proof"); + self.buffered_transfer_proofs.insert(peer, transfer_proof); + } }, Some(Err(_)) => { tracing::debug!("A swap stopped without sending a transfer proof"); @@ -306,8 +317,8 @@ impl LatestRate for kraken::RateUpdateStream { #[derive(Debug)] pub struct EventLoopHandle { - recv_encrypted_signature: Option>, - send_transfer_proof: Option>, + recv_encrypted_signature: Option>, + send_transfer_proof: Option>, } impl EventLoopHandle { @@ -327,7 +338,7 @@ impl EventLoopHandle { .send_transfer_proof .take() .context("Transfer proof was already sent")? - .send(TransferProof { tx_lock_proof: msg }) + .send(transfer_proof::Request { tx_lock_proof: msg }) .is_err() { bail!("Failed to send transfer proof, receiver no longer listening?") diff --git a/swap/src/protocol/alice/execution_setup.rs b/swap/src/protocol/alice/execution_setup.rs index b2ec5162..965b6bd6 100644 --- a/swap/src/protocol/alice/execution_setup.rs +++ b/swap/src/protocol/alice/execution_setup.rs @@ -1,5 +1,5 @@ use crate::bitcoin::{EncryptedSignature, Signature}; -use crate::network::request_response::BUF_SIZE; +use crate::network::cbor_request_response::BUF_SIZE; use crate::protocol::alice::{State0, State3}; use crate::protocol::bob::{Message0, Message2, Message4}; use crate::{bitcoin, monero}; diff --git a/swap/src/protocol/alice/transfer_proof.rs b/swap/src/protocol/alice/transfer_proof.rs deleted file mode 100644 index 2d03add8..00000000 --- a/swap/src/protocol/alice/transfer_proof.rs +++ /dev/null @@ -1,82 +0,0 @@ -use crate::monero; -use crate::network::request_response::{CborCodec, TransferProofProtocol, TIMEOUT}; -use anyhow::{anyhow, Error}; -use libp2p::request_response::{ - ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent, - RequestResponseMessage, -}; -use libp2p::{NetworkBehaviour, PeerId}; -use serde::{Deserialize, Serialize}; -use std::time::Duration; - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct TransferProof { - pub tx_lock_proof: monero::TransferProof, -} - -#[derive(Debug)] -pub enum OutEvent { - Acknowledged(PeerId), - Failure { peer: PeerId, error: Error }, -} - -/// A `NetworkBehaviour` that represents sending the Monero transfer proof to -/// Bob. -#[derive(NetworkBehaviour)] -#[behaviour(out_event = "OutEvent", event_process = false)] -#[allow(missing_debug_implementations)] -pub struct Behaviour { - rr: RequestResponse>, -} - -impl Behaviour { - pub fn send(&mut self, bob: PeerId, msg: TransferProof) { - let _id = self.rr.send_request(&bob, msg); - } -} - -impl Default for Behaviour { - fn default() -> Self { - let timeout = Duration::from_secs(TIMEOUT); - let mut config = RequestResponseConfig::default(); - config.set_request_timeout(timeout); - - Self { - rr: RequestResponse::new( - CborCodec::default(), - vec![(TransferProofProtocol, ProtocolSupport::Outbound)], - config, - ), - } - } -} - -impl From> for OutEvent { - fn from(event: RequestResponseEvent) -> Self { - match event { - RequestResponseEvent::Message { - message: RequestResponseMessage::Request { .. }, - peer, - } => OutEvent::Failure { - peer, - error: anyhow!("Alice should never get a transfer proof request from Bob"), - }, - RequestResponseEvent::Message { - message: RequestResponseMessage::Response { .. }, - peer, - } => OutEvent::Acknowledged(peer), - RequestResponseEvent::InboundFailure { error, peer, .. } => OutEvent::Failure { - peer, - error: anyhow!("Inbound failure: {:?}", error), - }, - RequestResponseEvent::OutboundFailure { error, peer, .. } => OutEvent::Failure { - peer, - error: anyhow!("Outbound failure: {:?}", error), - }, - RequestResponseEvent::ResponseSent { peer, .. } => OutEvent::Failure { - peer, - error: anyhow!("Alice should not send a response"), - }, - } - } -} diff --git a/swap/src/protocol/bob.rs b/swap/src/protocol/bob.rs index 62103aea..604d879a 100644 --- a/swap/src/protocol/bob.rs +++ b/swap/src/protocol/bob.rs @@ -1,35 +1,31 @@ use crate::database::Database; use crate::env::Config; -use crate::network::{peer_tracker, spot_price}; -use crate::protocol::alice::TransferProof; +use crate::network::{encrypted_signature, spot_price}; use crate::protocol::bob; use crate::{bitcoin, monero}; use anyhow::{anyhow, Error, Result}; pub use execution_setup::{Message0, Message2, Message4}; use libp2p::core::Multiaddr; -use libp2p::request_response::{RequestResponseMessage, ResponseChannel}; +use libp2p::request_response::{RequestResponseEvent, RequestResponseMessage, ResponseChannel}; use libp2p::{NetworkBehaviour, PeerId}; use std::sync::Arc; use tracing::debug; use uuid::Uuid; pub use self::cancel::cancel; -pub use self::encrypted_signature::EncryptedSignature; pub use self::event_loop::{EventLoop, EventLoopHandle}; pub use self::refund::refund; pub use self::state::*; pub use self::swap::{run, run_until}; -use crate::network::quote; use crate::network::quote::BidQuote; +use crate::network::{quote, transfer_proof}; pub mod cancel; -mod encrypted_signature; pub mod event_loop; mod execution_setup; pub mod refund; pub mod state; pub mod swap; -mod transfer_proof; pub struct Swap { pub state: BobState, @@ -117,8 +113,8 @@ pub enum OutEvent { QuoteReceived(BidQuote), SpotPriceReceived(spot_price::Response), ExecutionSetupDone(Result>), - TransferProof { - msg: Box, + TransferProofReceived { + msg: Box, channel: ResponseChannel<()>, }, EncryptedSignatureAcknowledged, @@ -126,11 +122,54 @@ pub enum OutEvent { CommunicationError(Error), } -impl From for OutEvent { - fn from(event: peer_tracker::OutEvent) -> Self { - match event { - peer_tracker::OutEvent::ConnectionEstablished(id) => { - OutEvent::ConnectionEstablished(id) +impl OutEvent { + fn unexpected_request() -> OutEvent { + OutEvent::CommunicationError(anyhow!("Unexpected request received")) + } + + fn unexpected_response() -> OutEvent { + OutEvent::CommunicationError(anyhow!("Unexpected response received")) + } +} + +impl From for OutEvent { + fn from(message: quote::Message) -> Self { + match message { + quote::Message::Request { .. } => OutEvent::unexpected_request(), + quote::Message::Response { response, .. } => OutEvent::QuoteReceived(response), + } + } +} + +impl From for OutEvent { + fn from(message: spot_price::Message) -> Self { + match message { + spot_price::Message::Request { .. } => OutEvent::unexpected_request(), + spot_price::Message::Response { response, .. } => OutEvent::SpotPriceReceived(response), + } + } +} + +impl From for OutEvent { + fn from(message: transfer_proof::Message) -> Self { + match message { + transfer_proof::Message::Request { + request, channel, .. + } => OutEvent::TransferProofReceived { + msg: Box::new(request), + channel, + }, + transfer_proof::Message::Response { .. } => OutEvent::unexpected_response(), + } + } +} + +impl From for OutEvent { + fn from(message: encrypted_signature::Message) -> Self { + match message { + encrypted_signature::Message::Request { .. } => OutEvent::unexpected_request(), + encrypted_signature::Message::Response { .. } => { + OutEvent::EncryptedSignatureAcknowledged } } } @@ -138,65 +177,47 @@ impl From for OutEvent { impl From for OutEvent { fn from(event: spot_price::OutEvent) -> Self { - match event { - spot_price::OutEvent::Message { - message: RequestResponseMessage::Response { response, .. }, - .. - } => OutEvent::SpotPriceReceived(response), - spot_price::OutEvent::Message { - message: RequestResponseMessage::Request { .. }, - .. - } => OutEvent::CommunicationError(anyhow!( - "Bob is only meant to receive spot prices, not hand them out" - )), - spot_price::OutEvent::ResponseSent { .. } => OutEvent::ResponseSent, - spot_price::OutEvent::InboundFailure { peer, error, .. } => { - OutEvent::CommunicationError(anyhow!( - "spot_price protocol with peer {} failed due to {:?}", - peer, - error - )) - } - spot_price::OutEvent::OutboundFailure { peer, error, .. } => { - OutEvent::CommunicationError(anyhow!( - "spot_price protocol with peer {} failed due to {:?}", - peer, - error - )) - } - } + map_rr_event_to_outevent(event) } } impl From for OutEvent { fn from(event: quote::OutEvent) -> Self { - match event { - quote::OutEvent::Message { - message: RequestResponseMessage::Response { response, .. }, - .. - } => OutEvent::QuoteReceived(response), - quote::OutEvent::Message { - message: RequestResponseMessage::Request { .. }, - .. - } => OutEvent::CommunicationError(anyhow!( - "Bob is only meant to receive quotes, not hand them out" - )), - quote::OutEvent::ResponseSent { .. } => OutEvent::ResponseSent, - quote::OutEvent::InboundFailure { peer, error, .. } => { - OutEvent::CommunicationError(anyhow!( - "quote protocol with peer {} failed due to {:?}", - peer, - error - )) - } - quote::OutEvent::OutboundFailure { peer, error, .. } => { - OutEvent::CommunicationError(anyhow!( - "quote protocol with peer {} failed due to {:?}", - peer, - error - )) - } - } + map_rr_event_to_outevent(event) + } +} + +impl From for OutEvent { + fn from(event: transfer_proof::OutEvent) -> Self { + map_rr_event_to_outevent(event) + } +} + +impl From for OutEvent { + fn from(event: encrypted_signature::OutEvent) -> Self { + map_rr_event_to_outevent(event) + } +} + +fn map_rr_event_to_outevent(event: RequestResponseEvent) -> OutEvent +where + OutEvent: From>, +{ + use RequestResponseEvent::*; + + match event { + Message { message, .. } => OutEvent::from(message), + ResponseSent { .. } => OutEvent::ResponseSent, + InboundFailure { peer, error, .. } => OutEvent::CommunicationError(anyhow!( + "protocol with peer {} failed due to {:?}", + peer, + error + )), + OutboundFailure { peer, error, .. } => OutEvent::CommunicationError(anyhow!( + "protocol with peer {} failed due to {:?}", + peer, + error + )), } } @@ -208,40 +229,11 @@ impl From for OutEvent { } } -impl From for OutEvent { - fn from(event: transfer_proof::OutEvent) -> Self { - use transfer_proof::OutEvent::*; - match event { - MsgReceived { msg, channel } => OutEvent::TransferProof { - msg: Box::new(msg), - channel, - }, - AckSent => OutEvent::ResponseSent, - Failure(err) => { - OutEvent::CommunicationError(err.context("Failure with Transfer Proof")) - } - } - } -} - -impl From for OutEvent { - fn from(event: encrypted_signature::OutEvent) -> Self { - use encrypted_signature::OutEvent::*; - match event { - Acknowledged => OutEvent::EncryptedSignatureAcknowledged, - Failure(err) => { - OutEvent::CommunicationError(err.context("Failure with Encrypted Signature")) - } - } - } -} - /// A `NetworkBehaviour` that represents an XMR/BTC swap node as Bob. #[derive(NetworkBehaviour)] #[behaviour(out_event = "OutEvent", event_process = false)] #[allow(missing_debug_implementations)] pub struct Behaviour { - pt: peer_tracker::Behaviour, quote: quote::Behaviour, spot_price: spot_price::Behaviour, execution_setup: execution_setup::Behaviour, @@ -252,12 +244,11 @@ pub struct Behaviour { impl Default for Behaviour { fn default() -> Self { Self { - pt: Default::default(), quote: quote::bob(), spot_price: spot_price::bob(), execution_setup: Default::default(), - transfer_proof: Default::default(), - encrypted_signature: Default::default(), + transfer_proof: transfer_proof::bob(), + encrypted_signature: encrypted_signature::bob(), } } } @@ -286,13 +277,16 @@ impl Behaviour { alice: PeerId, tx_redeem_encsig: bitcoin::EncryptedSignature, ) { - let msg = EncryptedSignature { tx_redeem_encsig }; - self.encrypted_signature.send(alice, msg); + let msg = encrypted_signature::Request { tx_redeem_encsig }; + self.encrypted_signature.send_request(&alice, msg); debug!("Encrypted signature sent"); } /// Add a known address for the given peer pub fn add_address(&mut self, peer_id: PeerId, address: Multiaddr) { - self.pt.add_address(peer_id, address) + self.quote.add_address(&peer_id, address.clone()); + self.spot_price.add_address(&peer_id, address.clone()); + self.transfer_proof.add_address(&peer_id, address.clone()); + self.encrypted_signature.add_address(&peer_id, address); } } diff --git a/swap/src/protocol/bob/encrypted_signature.rs b/swap/src/protocol/bob/encrypted_signature.rs deleted file mode 100644 index ed121975..00000000 --- a/swap/src/protocol/bob/encrypted_signature.rs +++ /dev/null @@ -1,74 +0,0 @@ -use crate::network::request_response::{CborCodec, EncryptedSignatureProtocol, TIMEOUT}; -use anyhow::{anyhow, Error}; -use libp2p::request_response::{ - ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent, - RequestResponseMessage, -}; -use libp2p::{NetworkBehaviour, PeerId}; -use serde::{Deserialize, Serialize}; -use std::time::Duration; - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct EncryptedSignature { - pub tx_redeem_encsig: crate::bitcoin::EncryptedSignature, -} - -#[derive(Debug)] -pub enum OutEvent { - Acknowledged, - Failure(Error), -} - -/// A `NetworkBehaviour` that represents sending encrypted signature to Alice. -#[derive(NetworkBehaviour)] -#[behaviour(out_event = "OutEvent", event_process = false)] -#[allow(missing_debug_implementations)] -pub struct Behaviour { - rr: RequestResponse>, -} - -impl Behaviour { - pub fn send(&mut self, alice: PeerId, msg: EncryptedSignature) { - let _id = self.rr.send_request(&alice, msg); - } -} - -impl Default for Behaviour { - fn default() -> Self { - let timeout = Duration::from_secs(TIMEOUT); - let mut config = RequestResponseConfig::default(); - config.set_request_timeout(timeout); - - Self { - rr: RequestResponse::new( - CborCodec::default(), - vec![(EncryptedSignatureProtocol, ProtocolSupport::Outbound)], - config, - ), - } - } -} - -impl From> for OutEvent { - fn from(event: RequestResponseEvent) -> Self { - match event { - RequestResponseEvent::Message { - message: RequestResponseMessage::Request { .. }, - .. - } => OutEvent::Failure(anyhow!("Bob should never get a request from Alice")), - RequestResponseEvent::Message { - message: RequestResponseMessage::Response { .. }, - .. - } => OutEvent::Acknowledged, - RequestResponseEvent::InboundFailure { error, .. } => { - OutEvent::Failure(anyhow!("Inbound failure: {:?}", error)) - } - RequestResponseEvent::OutboundFailure { error, .. } => { - OutEvent::Failure(anyhow!("Outbound failure: {:?}", error)) - } - RequestResponseEvent::ResponseSent { .. } => OutEvent::Failure(anyhow!( - "Bob does not send the encrypted signature response to Alice" - )), - } - } -} diff --git a/swap/src/protocol/bob/event_loop.rs b/swap/src/protocol/bob/event_loop.rs index 1cc1b3b9..97e49701 100644 --- a/swap/src/protocol/bob/event_loop.rs +++ b/swap/src/protocol/bob/event_loop.rs @@ -1,34 +1,168 @@ use crate::bitcoin::EncryptedSignature; use crate::network::quote::BidQuote; -use crate::network::{spot_price, transport, TokioExecutor}; -use crate::protocol::alice::TransferProof; +use crate::network::{spot_price, transfer_proof}; use crate::protocol::bob::{Behaviour, OutEvent, State0, State2}; use crate::{bitcoin, monero}; -use anyhow::{anyhow, bail, Context, Result}; +use anyhow::{anyhow, Result}; use futures::FutureExt; -use libp2p::core::Multiaddr; -use libp2p::PeerId; -use std::convert::Infallible; +use libp2p::swarm::SwarmEvent; +use libp2p::{PeerId, Swarm}; use std::sync::Arc; use tokio::sync::mpsc::{Receiver, Sender}; -use tracing::{debug, error, trace}; +use tracing::{debug, error}; -#[derive(Debug)] -pub struct Channels { - sender: Sender, - receiver: Receiver, +#[allow(missing_debug_implementations)] +pub struct EventLoop { + swarm: libp2p::Swarm, + bitcoin_wallet: Arc, + alice_peer_id: PeerId, + request_spot_price: Receiver, + recv_spot_price: Sender, + start_execution_setup: Receiver, + done_execution_setup: Sender>, + recv_transfer_proof: Sender, + conn_established: Sender, + send_encrypted_signature: Receiver, + request_quote: Receiver<()>, + recv_quote: Sender, } -impl Channels { - pub fn new() -> Channels { - let (sender, receiver) = tokio::sync::mpsc::channel(100); - Channels { sender, receiver } +impl EventLoop { + pub fn new( + swarm: Swarm, + alice_peer_id: PeerId, + bitcoin_wallet: Arc, + ) -> Result<(Self, EventLoopHandle)> { + let start_execution_setup = Channels::new(); + let done_execution_setup = Channels::new(); + let recv_transfer_proof = Channels::new(); + let conn_established = Channels::new(); + let send_encrypted_signature = Channels::new(); + let request_spot_price = Channels::new(); + let recv_spot_price = Channels::new(); + let request_quote = Channels::new(); + let recv_quote = Channels::new(); + + let event_loop = EventLoop { + swarm, + alice_peer_id, + bitcoin_wallet, + start_execution_setup: start_execution_setup.receiver, + done_execution_setup: done_execution_setup.sender, + recv_transfer_proof: recv_transfer_proof.sender, + conn_established: conn_established.sender, + send_encrypted_signature: send_encrypted_signature.receiver, + request_spot_price: request_spot_price.receiver, + recv_spot_price: recv_spot_price.sender, + request_quote: request_quote.receiver, + recv_quote: recv_quote.sender, + }; + + let handle = EventLoopHandle { + start_execution_setup: start_execution_setup.sender, + done_execution_setup: done_execution_setup.receiver, + recv_transfer_proof: recv_transfer_proof.receiver, + send_encrypted_signature: send_encrypted_signature.sender, + request_spot_price: request_spot_price.sender, + recv_spot_price: recv_spot_price.receiver, + request_quote: request_quote.sender, + recv_quote: recv_quote.receiver, + }; + + Ok((event_loop, handle)) } -} -impl Default for Channels { - fn default() -> Self { - Self::new() + pub async fn run(mut self) { + let _ = Swarm::dial(&mut self.swarm, &self.alice_peer_id); + + loop { + tokio::select! { + swarm_event = self.swarm.next_event().fuse() => { + match swarm_event { + SwarmEvent::Behaviour(OutEvent::ConnectionEstablished(peer_id)) => { + let _ = self.conn_established.send(peer_id).await; + } + SwarmEvent::Behaviour(OutEvent::SpotPriceReceived(msg)) => { + let _ = self.recv_spot_price.send(msg).await; + } + SwarmEvent::Behaviour(OutEvent::QuoteReceived(msg)) => { + let _ = self.recv_quote.send(msg).await; + } + SwarmEvent::Behaviour(OutEvent::ExecutionSetupDone(res)) => { + let _ = self.done_execution_setup.send(res.map(|state|*state)).await; + } + SwarmEvent::Behaviour(OutEvent::TransferProofReceived{ msg, channel }) => { + let _ = self.recv_transfer_proof.send(*msg).await; + // Send back empty response so that the request/response protocol completes. + if let Err(error) = self.swarm.transfer_proof.send_response(channel, ()) { + error!("Failed to send Transfer Proof ack: {:?}", error); + } + } + SwarmEvent::Behaviour(OutEvent::EncryptedSignatureAcknowledged) => { + debug!("Alice acknowledged encrypted signature"); + } + SwarmEvent::Behaviour(OutEvent::ResponseSent) => { + + } + SwarmEvent::Behaviour(OutEvent::CommunicationError(error)) => { + tracing::warn!("Communication error: {:#}", error); + return; + } + SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } if peer_id == self.alice_peer_id => { + tracing::debug!("Connected to Alice at {}", endpoint.get_remote_address()); + } + SwarmEvent::Dialing(peer_id) if peer_id == self.alice_peer_id => { + tracing::debug!("Dialling Alice at {}", peer_id); + } + SwarmEvent::ConnectionClosed { peer_id, endpoint, num_established, cause } if peer_id == self.alice_peer_id && num_established == 0 => { + match cause { + Some(error) => { + tracing::warn!("Lost connection to Alice at {}, cause: {}", endpoint.get_remote_address(), error); + }, + None => { + // no error means the disconnection was requested + tracing::info!("Successfully closed connection to Alice"); + return; + } + } + match libp2p::Swarm::dial(&mut self.swarm, &self.alice_peer_id) { + Ok(()) => {}, + Err(e) => { + tracing::warn!("Failed to re-dial Alice: {}", e); + return; + } + } + } + SwarmEvent::UnreachableAddr { peer_id, address, attempts_remaining, error } if peer_id == self.alice_peer_id && attempts_remaining == 0 => { + tracing::warn!("Failed to dial Alice at {}: {}", address, error); + } + _ => {} + } + }, + spot_price_request = self.request_spot_price.recv().fuse() => { + if let Some(request) = spot_price_request { + self.swarm.request_spot_price(self.alice_peer_id, request); + } + }, + quote_request = self.request_quote.recv().fuse() => { + if quote_request.is_some() { + self.swarm.request_quote(self.alice_peer_id); + } + }, + option = self.start_execution_setup.recv().fuse() => { + if let Some(state0) = option { + let _ = self + .swarm + .start_execution_setup(self.alice_peer_id, state0, self.bitcoin_wallet.clone()); + } + }, + encrypted_signature = self.send_encrypted_signature.recv().fuse() => { + if let Some(tx_redeem_encsig) = encrypted_signature { + self.swarm.send_encrypted_signature(self.alice_peer_id, tx_redeem_encsig); + } + } + } + } } } @@ -36,9 +170,7 @@ impl Default for Channels { pub struct EventLoopHandle { start_execution_setup: Sender, done_execution_setup: Receiver>, - recv_transfer_proof: Receiver, - conn_established: Receiver, - dial_alice: Sender<()>, + recv_transfer_proof: Receiver, send_encrypted_signature: Sender, request_spot_price: Sender, recv_spot_price: Receiver, @@ -56,26 +188,13 @@ impl EventLoopHandle { .ok_or_else(|| anyhow!("Failed to setup execution with Alice"))? } - pub async fn recv_transfer_proof(&mut self) -> Result { + pub async fn recv_transfer_proof(&mut self) -> Result { self.recv_transfer_proof .recv() .await .ok_or_else(|| anyhow!("Failed to receive transfer proof from Alice")) } - /// Dials other party and wait for the connection to be established. - /// Do nothing if we are already connected - pub async fn dial(&mut self) -> Result<()> { - let _ = self.dial_alice.send(()).await?; - - self.conn_established - .recv() - .await - .ok_or_else(|| anyhow!("Failed to receive connection established from Alice"))?; - - Ok(()) - } - pub async fn request_spot_price(&mut self, btc: bitcoin::Amount) -> Result { let _ = self .request_spot_price @@ -113,156 +232,21 @@ impl EventLoopHandle { } } -#[allow(missing_debug_implementations)] -pub struct EventLoop { - swarm: libp2p::Swarm, - bitcoin_wallet: Arc, - alice_peer_id: PeerId, - request_spot_price: Receiver, - recv_spot_price: Sender, - start_execution_setup: Receiver, - done_execution_setup: Sender>, - recv_transfer_proof: Sender, - dial_alice: Receiver<()>, - conn_established: Sender, - send_encrypted_signature: Receiver, - request_quote: Receiver<()>, - recv_quote: Sender, +#[derive(Debug)] +struct Channels { + sender: Sender, + receiver: Receiver, } -impl EventLoop { - pub fn new( - identity: &libp2p::core::identity::Keypair, - alice_peer_id: PeerId, - alice_addr: Multiaddr, - bitcoin_wallet: Arc, - ) -> Result<(Self, EventLoopHandle)> { - let behaviour = Behaviour::default(); - let transport = transport::build(identity)?; - - let mut swarm = libp2p::swarm::SwarmBuilder::new( - transport, - behaviour, - identity.public().into_peer_id(), - ) - .executor(Box::new(TokioExecutor { - handle: tokio::runtime::Handle::current(), - })) - .build(); - - swarm.add_address(alice_peer_id, alice_addr); - - let start_execution_setup = Channels::new(); - let done_execution_setup = Channels::new(); - let recv_transfer_proof = Channels::new(); - let dial_alice = Channels::new(); - let conn_established = Channels::new(); - let send_encrypted_signature = Channels::new(); - let request_spot_price = Channels::new(); - let recv_spot_price = Channels::new(); - let request_quote = Channels::new(); - let recv_quote = Channels::new(); - - let event_loop = EventLoop { - swarm, - alice_peer_id, - bitcoin_wallet, - start_execution_setup: start_execution_setup.receiver, - done_execution_setup: done_execution_setup.sender, - recv_transfer_proof: recv_transfer_proof.sender, - conn_established: conn_established.sender, - dial_alice: dial_alice.receiver, - send_encrypted_signature: send_encrypted_signature.receiver, - request_spot_price: request_spot_price.receiver, - recv_spot_price: recv_spot_price.sender, - request_quote: request_quote.receiver, - recv_quote: recv_quote.sender, - }; - - let handle = EventLoopHandle { - start_execution_setup: start_execution_setup.sender, - done_execution_setup: done_execution_setup.receiver, - recv_transfer_proof: recv_transfer_proof.receiver, - conn_established: conn_established.receiver, - dial_alice: dial_alice.sender, - send_encrypted_signature: send_encrypted_signature.sender, - request_spot_price: request_spot_price.sender, - recv_spot_price: recv_spot_price.receiver, - request_quote: request_quote.sender, - recv_quote: recv_quote.receiver, - }; - - Ok((event_loop, handle)) - } - - pub async fn run(mut self) -> Result { - loop { - tokio::select! { - swarm_event = self.swarm.next().fuse() => { - match swarm_event { - OutEvent::ConnectionEstablished(peer_id) => { - let _ = self.conn_established.send(peer_id).await; - } - OutEvent::SpotPriceReceived(msg) => { - let _ = self.recv_spot_price.send(msg).await; - }, - OutEvent::QuoteReceived(msg) => { - let _ = self.recv_quote.send(msg).await; - }, - OutEvent::ExecutionSetupDone(res) => { - let _ = self.done_execution_setup.send(res.map(|state|*state)).await; - } - OutEvent::TransferProof{ msg, channel }=> { - let _ = self.recv_transfer_proof.send(*msg).await; - // Send back empty response so that the request/response protocol completes. - if let Err(error) = self.swarm.transfer_proof.send_ack(channel) { - error!("Failed to send Transfer Proof ack: {:?}", error); - } - } - OutEvent::EncryptedSignatureAcknowledged => { - debug!("Alice acknowledged encrypted signature"); - } - OutEvent::ResponseSent => {} - OutEvent::CommunicationError(err) => { - bail!("Communication error: {:#}", err) - } - } - }, - option = self.dial_alice.recv().fuse() => { - if option.is_some() { - let peer_id = self.alice_peer_id; - if self.swarm.pt.is_connected(&peer_id) { - trace!("Already connected to Alice at {}", peer_id); - let _ = self.conn_established.send(peer_id).await; - } else { - debug!("Dialing alice at {}", peer_id); - libp2p::Swarm::dial(&mut self.swarm, &peer_id).context("Failed to dial alice")?; - } - } - }, - spot_price_request = self.request_spot_price.recv().fuse() => { - if let Some(request) = spot_price_request { - self.swarm.request_spot_price(self.alice_peer_id, request); - } - }, - quote_request = self.request_quote.recv().fuse() => { - if quote_request.is_some() { - self.swarm.request_quote(self.alice_peer_id); - } - }, - option = self.start_execution_setup.recv().fuse() => { - if let Some(state0) = option { - let _ = self - .swarm - .start_execution_setup(self.alice_peer_id, state0, self.bitcoin_wallet.clone()); - } - }, - encrypted_signature = self.send_encrypted_signature.recv().fuse() => { - if let Some(tx_redeem_encsig) = encrypted_signature { - self.swarm.send_encrypted_signature(self.alice_peer_id, tx_redeem_encsig); - } - } - } - } +impl Channels { + fn new() -> Channels { + let (sender, receiver) = tokio::sync::mpsc::channel(100); + Channels { sender, receiver } + } +} + +impl Default for Channels { + fn default() -> Self { + Self::new() } } diff --git a/swap/src/protocol/bob/execution_setup.rs b/swap/src/protocol/bob/execution_setup.rs index f2106737..6fa7491e 100644 --- a/swap/src/protocol/bob/execution_setup.rs +++ b/swap/src/protocol/bob/execution_setup.rs @@ -1,5 +1,5 @@ use crate::bitcoin::Signature; -use crate::network::request_response::BUF_SIZE; +use crate::network::cbor_request_response::BUF_SIZE; use crate::protocol::alice::{Message1, Message3}; use crate::protocol::bob::{State0, State2}; use anyhow::{Context, Error, Result}; diff --git a/swap/src/protocol/bob/state.rs b/swap/src/protocol/bob/state.rs index 28aa05a0..b2703a51 100644 --- a/swap/src/protocol/bob/state.rs +++ b/swap/src/protocol/bob/state.rs @@ -7,7 +7,7 @@ use crate::monero::wallet::WatchRequest; use crate::monero::{monero_private_key, TransferProof}; use crate::monero_ext::ScalarExt; use crate::protocol::alice::{Message1, Message3}; -use crate::protocol::bob::{EncryptedSignature, Message0, Message2, Message4}; +use crate::protocol::bob::{Message0, Message2, Message4}; use crate::protocol::CROSS_CURVE_PROOF_SYSTEM; use anyhow::{anyhow, bail, Context, Result}; use ecdsa_fun::adaptor::{Adaptor, HashTranscript}; @@ -404,12 +404,6 @@ pub struct State4 { } impl State4 { - pub fn next_message(&self) -> EncryptedSignature { - EncryptedSignature { - tx_redeem_encsig: self.tx_redeem_encsig(), - } - } - pub fn tx_redeem_encsig(&self) -> bitcoin::EncryptedSignature { let tx_redeem = bitcoin::TxRedeem::new(&self.tx_lock, &self.redeem_address); self.b.encsign(self.S_a_bitcoin, tx_redeem.digest()) diff --git a/swap/src/protocol/bob/swap.rs b/swap/src/protocol/bob/swap.rs index f13e5ffc..cb264750 100644 --- a/swap/src/protocol/bob/swap.rs +++ b/swap/src/protocol/bob/swap.rs @@ -69,8 +69,6 @@ async fn run_until_internal( BobState::Started { btc_amount } => { let bitcoin_refund_address = bitcoin_wallet.new_address().await?; - event_loop_handle.dial().await?; - let state2 = request_price_and_setup( btc_amount, &mut event_loop_handle, @@ -82,8 +80,6 @@ async fn run_until_internal( BobState::ExecutionSetupDone(state2) } BobState::ExecutionSetupDone(state2) => { - // Do not lock Bitcoin if not connected to Alice. - event_loop_handle.dial().await?; // Alice and Bob have exchanged info let (state3, tx_lock) = state2.lock_btc().await?; let signed_tx = bitcoin_wallet @@ -98,8 +94,6 @@ async fn run_until_internal( // Watch for Alice to Lock Xmr or for cancel timelock to elapse BobState::BtcLocked(state3) => { if let ExpiredTimelocks::None = state3.current_epoch(bitcoin_wallet.as_ref()).await? { - event_loop_handle.dial().await?; - let transfer_proof_watcher = event_loop_handle.recv_transfer_proof(); let cancel_timelock_expires = state3.wait_for_cancel_timelock_to_expire(bitcoin_wallet.as_ref()); @@ -140,8 +134,6 @@ async fn run_until_internal( monero_wallet_restore_blockheight, } => { if let ExpiredTimelocks::None = state.current_epoch(bitcoin_wallet.as_ref()).await? { - event_loop_handle.dial().await?; - let watch_request = state.lock_xmr_watch_request(lock_transfer_proof); select! { @@ -166,7 +158,6 @@ async fn run_until_internal( } BobState::XmrLocked(state) => { if let ExpiredTimelocks::None = state.expired_timelock(bitcoin_wallet.as_ref()).await? { - event_loop_handle.dial().await?; // Alice has locked Xmr // Bob sends Alice his key diff --git a/swap/src/protocol/bob/transfer_proof.rs b/swap/src/protocol/bob/transfer_proof.rs deleted file mode 100644 index 7b1ff1be..00000000 --- a/swap/src/protocol/bob/transfer_proof.rs +++ /dev/null @@ -1,85 +0,0 @@ -use crate::network::request_response::{CborCodec, TransferProofProtocol, TIMEOUT}; -use crate::protocol::alice::TransferProof; -use anyhow::{anyhow, Error, Result}; -use libp2p::request_response::{ - ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent, - RequestResponseMessage, ResponseChannel, -}; -use libp2p::NetworkBehaviour; -use std::time::Duration; -use tracing::debug; - -#[derive(Debug)] -pub enum OutEvent { - MsgReceived { - msg: TransferProof, - channel: ResponseChannel<()>, - }, - AckSent, - Failure(Error), -} - -/// A `NetworkBehaviour` that represents receiving the transfer proof from -/// Alice. -#[derive(NetworkBehaviour)] -#[behaviour(out_event = "OutEvent", event_process = false)] -#[allow(missing_debug_implementations)] -pub struct Behaviour { - rr: RequestResponse>, -} - -impl Behaviour { - pub fn send_ack(&mut self, channel: ResponseChannel<()>) -> Result<()> { - self.rr - .send_response(channel, ()) - .map_err(|err| anyhow!("Failed to ack transfer proof: {:?}", err)) - } -} - -impl Default for Behaviour { - fn default() -> Self { - let timeout = Duration::from_secs(TIMEOUT); - let mut config = RequestResponseConfig::default(); - config.set_request_timeout(timeout); - - Self { - rr: RequestResponse::new( - CborCodec::default(), - vec![(TransferProofProtocol, ProtocolSupport::Inbound)], - config, - ), - } - } -} - -impl From> for OutEvent { - fn from(event: RequestResponseEvent) -> Self { - match event { - RequestResponseEvent::Message { - peer, - message: - RequestResponseMessage::Request { - request, channel, .. - }, - .. - } => { - debug!("Received Transfer Proof from {}", peer); - OutEvent::MsgReceived { - msg: request, - channel, - } - } - RequestResponseEvent::Message { - message: RequestResponseMessage::Response { .. }, - .. - } => OutEvent::Failure(anyhow!("Bob should not get a Response")), - RequestResponseEvent::InboundFailure { error, .. } => { - OutEvent::Failure(anyhow!("Inbound failure: {:?}", error)) - } - RequestResponseEvent::OutboundFailure { error, .. } => { - OutEvent::Failure(anyhow!("Outbound failure: {:?}", error)) - } - RequestResponseEvent::ResponseSent { .. } => OutEvent::AckSent, - } - } -} diff --git a/swap/src/seed.rs b/swap/src/seed.rs index 7432adc5..1536959d 100644 --- a/swap/src/seed.rs +++ b/swap/src/seed.rs @@ -15,7 +15,7 @@ use std::path::{Path, PathBuf}; pub const SEED_LENGTH: usize = 32; -#[derive(Clone, Copy, Eq, PartialEq)] +#[derive(Eq, PartialEq)] pub struct Seed([u8; SEED_LENGTH]); impl Seed { diff --git a/swap/tests/happy_path_restart_bob_before_comm.rs b/swap/tests/happy_path_restart_bob_after_xmr_locked.rs similarity index 100% rename from swap/tests/happy_path_restart_bob_before_comm.rs rename to swap/tests/happy_path_restart_bob_after_xmr_locked.rs diff --git a/swap/tests/happy_path_restart_bob_before_xmr_locked.rs b/swap/tests/happy_path_restart_bob_before_xmr_locked.rs new file mode 100644 index 00000000..6abba569 --- /dev/null +++ b/swap/tests/happy_path_restart_bob_before_xmr_locked.rs @@ -0,0 +1,34 @@ +pub mod testutils; + +use swap::protocol::bob::BobState; +use swap::protocol::{alice, bob}; +use testutils::bob_run_until::is_xmr_locked; +use testutils::SlowCancelConfig; + +#[tokio::test] +async fn given_bob_restarts_after_xmr_is_locked_resume_swap() { + testutils::setup_test(SlowCancelConfig, |mut ctx| async move { + let (bob_swap, bob_join_handle) = ctx.bob_swap().await; + let bob_swap = tokio::spawn(bob::run_until(bob_swap, is_xmr_locked)); + + let alice_swap = ctx.alice_next_swap().await; + let alice_swap = tokio::spawn(alice::run(alice_swap)); + + let bob_state = bob_swap.await??; + + assert!(matches!(bob_state, BobState::XmrLocked { .. })); + + let (bob_swap, _) = ctx.stop_and_resume_bob_from_db(bob_join_handle).await; + assert!(matches!(bob_swap.state, BobState::XmrLocked { .. })); + + let bob_state = bob::run(bob_swap).await?; + + ctx.assert_bob_redeemed(bob_state).await; + + let alice_state = alice_swap.await??; + ctx.assert_alice_redeemed(alice_state).await; + + Ok(()) + }) + .await; +} diff --git a/swap/tests/testutils/mod.rs b/swap/tests/testutils/mod.rs index d9070d8e..fb442c2d 100644 --- a/swap/tests/testutils/mod.rs +++ b/swap/tests/testutils/mod.rs @@ -7,9 +7,8 @@ use bitcoin_harness::{BitcoindRpcApi, Client}; use futures::Future; use get_port::get_port; use libp2p::core::Multiaddr; -use libp2p::PeerId; +use libp2p::{PeerId, Swarm}; use monero_harness::{image, Monero}; -use std::convert::Infallible; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; @@ -17,6 +16,7 @@ use swap::asb::FixedRate; use swap::bitcoin::{CancelTimelock, PunishTimelock}; use swap::database::Database; use swap::env::{Config, GetConfig}; +use swap::network::swarm; use swap::protocol::alice::{AliceState, Swap}; use swap::protocol::bob::BobState; use swap::protocol::{alice, bob}; @@ -43,7 +43,6 @@ pub struct StartingBalances { pub btc: bitcoin::Amount, } -#[derive(Clone)] struct BobParams { seed: Seed, db_path: PathBuf, @@ -71,16 +70,14 @@ impl BobParams { } pub fn new_eventloop(&self) -> Result<(bob::EventLoop, bob::EventLoopHandle)> { - bob::EventLoop::new( - &self.seed.derive_libp2p_identity(), - self.alice_peer_id, - self.alice_address.clone(), - self.bitcoin_wallet.clone(), - ) + let mut swarm = swarm::new::(&self.seed)?; + swarm.add_address(self.alice_peer_id, self.alice_address.clone()); + + bob::EventLoop::new(swarm, self.alice_peer_id, self.bitcoin_wallet.clone()) } } -pub struct BobEventLoopJoinHandle(JoinHandle>); +pub struct BobEventLoopJoinHandle(JoinHandle<()>); impl BobEventLoopJoinHandle { pub fn abort(&self) { @@ -385,9 +382,11 @@ where ) .await; + let mut alice_swarm = swarm::new::(&alice_seed).unwrap(); + Swarm::listen_on(&mut alice_swarm, alice_listen_address.clone()).unwrap(); + let (alice_event_loop, alice_swap_handle) = alice::EventLoop::new( - alice_listen_address.clone(), - alice_seed, + alice_swarm, env_config, alice_bitcoin_wallet.clone(), alice_monero_wallet.clone(),