diff --git a/swap/src/network/swap_setup.rs b/swap/src/network/swap_setup.rs index b34963f1..33919fdc 100644 --- a/swap/src/network/swap_setup.rs +++ b/swap/src/network/swap_setup.rs @@ -1,3 +1,10 @@ +use libp2p::core::upgrade; +use libp2p::swarm::NegotiatedSubstream; +use serde::de::DeserializeOwned; +use serde::{Deserialize, Serialize}; + +pub const BUF_SIZE: usize = 1024 * 1024; + pub mod protocol { use futures::future; use libp2p::core::upgrade::{from_fn, FromFnUpgrade}; @@ -24,3 +31,76 @@ pub mod protocol { >, >; } + +#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq)] +pub struct BlockchainNetwork { + #[serde(with = "crate::bitcoin::network")] + pub bitcoin: bitcoin::Network, + #[serde(with = "crate::monero::network")] + pub monero: monero::Network, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct SpotPriceRequest { + #[serde(with = "::bitcoin::util::amount::serde::as_sat")] + pub btc: bitcoin::Amount, + pub blockchain_network: BlockchainNetwork, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum SpotPriceResponse { + Xmr(crate::monero::Amount), + Error(SpotPriceError), +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum SpotPriceError { + NoSwapsAccepted, + AmountBelowMinimum { + #[serde(with = "::bitcoin::util::amount::serde::as_sat")] + min: bitcoin::Amount, + #[serde(with = "::bitcoin::util::amount::serde::as_sat")] + buy: bitcoin::Amount, + }, + AmountAboveMaximum { + #[serde(with = "::bitcoin::util::amount::serde::as_sat")] + max: bitcoin::Amount, + #[serde(with = "::bitcoin::util::amount::serde::as_sat")] + buy: bitcoin::Amount, + }, + BalanceTooLow { + #[serde(with = "::bitcoin::util::amount::serde::as_sat")] + buy: bitcoin::Amount, + }, + BlockchainNetworkMismatch { + cli: BlockchainNetwork, + asb: BlockchainNetwork, + }, + /// To be used for errors that cannot be explained on the CLI side (e.g. + /// rate update problems on the seller side) + Other, +} + +pub async fn read_cbor_message(substream: &mut NegotiatedSubstream) -> anyhow::Result +where + T: DeserializeOwned, +{ + let bytes = upgrade::read_one(substream, BUF_SIZE).await?; + let mut de = serde_cbor::Deserializer::from_slice(&bytes); + let message = T::deserialize(&mut de)?; + + Ok(message) +} + +pub async fn write_cbor_message( + substream: &mut NegotiatedSubstream, + message: T, +) -> anyhow::Result<()> +where + T: Serialize, +{ + let bytes = serde_cbor::to_vec(&message)?; + upgrade::write_one(substream, &bytes).await?; + + Ok(()) +} diff --git a/swap/src/protocol/alice/swap_setup.rs b/swap/src/protocol/alice/swap_setup.rs index fff749a3..54e645d7 100644 --- a/swap/src/protocol/alice/swap_setup.rs +++ b/swap/src/protocol/alice/swap_setup.rs @@ -1,8 +1,8 @@ -use crate::network::swap_setup::protocol; -use crate::protocol::alice::event_loop::LatestRate; -use crate::protocol::alice::{State0, State3}; -use crate::protocol::{alice, Message0, Message2, Message4}; -use crate::{bitcoin, env, monero}; +use std::collections::VecDeque; +use std::fmt::Debug; +use std::task::{Context, Poll}; +use std::time::Duration; + use anyhow::{anyhow, Context as _, Result}; use futures::future::{BoxFuture, OptionFuture}; use futures::FutureExt; @@ -13,15 +13,18 @@ use libp2p::swarm::{ ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol, }; use libp2p::{Multiaddr, PeerId}; -use serde::de::DeserializeOwned; -use serde::{Deserialize, Serialize}; -use std::collections::VecDeque; -use std::fmt::Debug; -use std::task::{Context, Poll}; -use std::time::Duration; use uuid::Uuid; use void::Void; +use crate::network::swap_setup; +use crate::network::swap_setup::{ + protocol, BlockchainNetwork, SpotPriceError, SpotPriceRequest, SpotPriceResponse, +}; +use crate::protocol::alice::event_loop::LatestRate; +use crate::protocol::alice::{State0, State3}; +use crate::protocol::{alice, Message0, Message2, Message4}; +use crate::{bitcoin, env, monero}; + #[derive(Debug)] pub enum OutEvent { Initiated { @@ -104,14 +107,6 @@ impl From for alice::OutEvent { } } -#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq)] -pub struct BlockchainNetwork { - #[serde(with = "crate::bitcoin::network")] - pub bitcoin: bitcoin::Network, - #[serde(with = "crate::monero::network")] - pub monero: monero::Network, -} - #[allow(missing_debug_implementations)] pub struct Behaviour { events: VecDeque, @@ -204,47 +199,6 @@ where } } -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct SpotPriceRequest { - #[serde(with = "::bitcoin::util::amount::serde::as_sat")] - pub btc: bitcoin::Amount, - pub blockchain_network: BlockchainNetwork, -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub enum SpotPriceResponse { - Xmr(monero::Amount), - Error(SpotPriceError), -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub enum SpotPriceError { - NoSwapsAccepted, - AmountBelowMinimum { - #[serde(with = "::bitcoin::util::amount::serde::as_sat")] - min: bitcoin::Amount, - #[serde(with = "::bitcoin::util::amount::serde::as_sat")] - buy: bitcoin::Amount, - }, - AmountAboveMaximum { - #[serde(with = "::bitcoin::util::amount::serde::as_sat")] - max: bitcoin::Amount, - #[serde(with = "::bitcoin::util::amount::serde::as_sat")] - buy: bitcoin::Amount, - }, - BalanceTooLow { - #[serde(with = "::bitcoin::util::amount::serde::as_sat")] - buy: bitcoin::Amount, - }, - BlockchainNetworkMismatch { - cli: BlockchainNetwork, - asb: BlockchainNetwork, - }, - /// To be used for errors that cannot be explained on the CLI side (e.g. - /// rate update problems on the seller side) - Other, -} - // TODO: This is bob only. // enum OutboundState { // PendingOpen( @@ -298,8 +252,6 @@ pub enum HandlerOutEvent { pub enum HandlerInEvent {} -pub const BUF_SIZE: usize = 1024 * 1024; - impl ProtocolsHandler for Handler where LR: LatestRate + Send + 'static, @@ -332,7 +284,7 @@ where let env_config = self.env_config; let protocol = tokio::time::timeout(self.timeout, async move { - let request = read_cbor_message::(&mut substream) + let request = swap_setup::read_cbor_message::(&mut substream) .await .map_err(|e| Error::Io(e))?; let wallet_snapshot = sender @@ -392,14 +344,14 @@ where let xmr = match validate.await { Ok(xmr) => { - write_cbor_message(&mut substream, SpotPriceResponse::Xmr(xmr)) + swap_setup::write_cbor_message(&mut substream, SpotPriceResponse::Xmr(xmr)) .await .map_err(|e| Error::Io(e))?; xmr } Err(e) => { - write_cbor_message( + swap_setup::write_cbor_message( &mut substream, SpotPriceResponse::Error(e.to_error_response()), ) @@ -420,17 +372,17 @@ where &mut rand::thread_rng(), ); - let message0 = read_cbor_message::(&mut substream) + let message0 = swap_setup::read_cbor_message::(&mut substream) .await .context("Failed to deserialize message0") .map_err(|e| Error::Io(e))?; let (swap_id, state1) = state0.receive(message0).map_err(|e| Error::Io(e))?; - write_cbor_message(&mut substream, state1.next_message()) + swap_setup::write_cbor_message(&mut substream, state1.next_message()) .await .map_err(|e| Error::Io(e))?; - let message2 = read_cbor_message::(&mut substream) + let message2 = swap_setup::read_cbor_message::(&mut substream) .await .context("Failed to deserialize message2") .map_err(|e| Error::Io(e))?; @@ -439,11 +391,11 @@ where .context("Failed to receive Message2") .map_err(|e| Error::Io(e))?; - write_cbor_message(&mut substream, state2.next_message()) + swap_setup::write_cbor_message(&mut substream, state2.next_message()) .await .map_err(|e| Error::Io(e))?; - let message4 = read_cbor_message::(&mut substream) + let message4 = swap_setup::read_cbor_message::(&mut substream) .await .context("Failed to deserialize message4") .map_err(|e| Error::Io(e))?; @@ -509,27 +461,6 @@ where } } -async fn read_cbor_message(substream: &mut NegotiatedSubstream) -> Result -where - T: DeserializeOwned, -{ - let bytes = upgrade::read_one(substream, BUF_SIZE).await?; - let mut de = serde_cbor::Deserializer::from_slice(&bytes); - let message = T::deserialize(&mut de)?; - - Ok(message) -} - -async fn write_cbor_message(substream: &mut NegotiatedSubstream, message: T) -> Result<()> -where - T: Serialize, -{ - let bytes = serde_cbor::to_vec(&message)?; - upgrade::write_one(substream, &bytes).await?; - - Ok(()) -} - // TODO: Differentiate between errors that we send back and shit that happens on // our side (IO, timeout) #[derive(Debug, thiserror::Error)]