Move common stuff to network::swap_setup

This commit is contained in:
Thomas Eizinger 2021-06-24 20:07:27 +10:00
parent 981f9fd704
commit fd23884d74
No known key found for this signature in database
GPG Key ID: 651AC83A6C6C8B96
2 changed files with 102 additions and 91 deletions

View File

@ -1,3 +1,10 @@
use libp2p::core::upgrade;
use libp2p::swarm::NegotiatedSubstream;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
pub const BUF_SIZE: usize = 1024 * 1024;
pub mod protocol {
use futures::future;
use libp2p::core::upgrade::{from_fn, FromFnUpgrade};
@ -24,3 +31,76 @@ pub mod protocol {
>,
>;
}
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq)]
pub struct BlockchainNetwork {
#[serde(with = "crate::bitcoin::network")]
pub bitcoin: bitcoin::Network,
#[serde(with = "crate::monero::network")]
pub monero: monero::Network,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct SpotPriceRequest {
#[serde(with = "::bitcoin::util::amount::serde::as_sat")]
pub btc: bitcoin::Amount,
pub blockchain_network: BlockchainNetwork,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum SpotPriceResponse {
Xmr(crate::monero::Amount),
Error(SpotPriceError),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum SpotPriceError {
NoSwapsAccepted,
AmountBelowMinimum {
#[serde(with = "::bitcoin::util::amount::serde::as_sat")]
min: bitcoin::Amount,
#[serde(with = "::bitcoin::util::amount::serde::as_sat")]
buy: bitcoin::Amount,
},
AmountAboveMaximum {
#[serde(with = "::bitcoin::util::amount::serde::as_sat")]
max: bitcoin::Amount,
#[serde(with = "::bitcoin::util::amount::serde::as_sat")]
buy: bitcoin::Amount,
},
BalanceTooLow {
#[serde(with = "::bitcoin::util::amount::serde::as_sat")]
buy: bitcoin::Amount,
},
BlockchainNetworkMismatch {
cli: BlockchainNetwork,
asb: BlockchainNetwork,
},
/// To be used for errors that cannot be explained on the CLI side (e.g.
/// rate update problems on the seller side)
Other,
}
pub async fn read_cbor_message<T>(substream: &mut NegotiatedSubstream) -> anyhow::Result<T>
where
T: DeserializeOwned,
{
let bytes = upgrade::read_one(substream, BUF_SIZE).await?;
let mut de = serde_cbor::Deserializer::from_slice(&bytes);
let message = T::deserialize(&mut de)?;
Ok(message)
}
pub async fn write_cbor_message<T>(
substream: &mut NegotiatedSubstream,
message: T,
) -> anyhow::Result<()>
where
T: Serialize,
{
let bytes = serde_cbor::to_vec(&message)?;
upgrade::write_one(substream, &bytes).await?;
Ok(())
}

View File

@ -1,8 +1,8 @@
use crate::network::swap_setup::protocol;
use crate::protocol::alice::event_loop::LatestRate;
use crate::protocol::alice::{State0, State3};
use crate::protocol::{alice, Message0, Message2, Message4};
use crate::{bitcoin, env, monero};
use std::collections::VecDeque;
use std::fmt::Debug;
use std::task::{Context, Poll};
use std::time::Duration;
use anyhow::{anyhow, Context as _, Result};
use futures::future::{BoxFuture, OptionFuture};
use futures::FutureExt;
@ -13,15 +13,18 @@ use libp2p::swarm::{
ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol,
};
use libp2p::{Multiaddr, PeerId};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::fmt::Debug;
use std::task::{Context, Poll};
use std::time::Duration;
use uuid::Uuid;
use void::Void;
use crate::network::swap_setup;
use crate::network::swap_setup::{
protocol, BlockchainNetwork, SpotPriceError, SpotPriceRequest, SpotPriceResponse,
};
use crate::protocol::alice::event_loop::LatestRate;
use crate::protocol::alice::{State0, State3};
use crate::protocol::{alice, Message0, Message2, Message4};
use crate::{bitcoin, env, monero};
#[derive(Debug)]
pub enum OutEvent {
Initiated {
@ -104,14 +107,6 @@ impl From<OutEvent> for alice::OutEvent {
}
}
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq)]
pub struct BlockchainNetwork {
#[serde(with = "crate::bitcoin::network")]
pub bitcoin: bitcoin::Network,
#[serde(with = "crate::monero::network")]
pub monero: monero::Network,
}
#[allow(missing_debug_implementations)]
pub struct Behaviour<LR> {
events: VecDeque<OutEvent>,
@ -204,47 +199,6 @@ where
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct SpotPriceRequest {
#[serde(with = "::bitcoin::util::amount::serde::as_sat")]
pub btc: bitcoin::Amount,
pub blockchain_network: BlockchainNetwork,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum SpotPriceResponse {
Xmr(monero::Amount),
Error(SpotPriceError),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum SpotPriceError {
NoSwapsAccepted,
AmountBelowMinimum {
#[serde(with = "::bitcoin::util::amount::serde::as_sat")]
min: bitcoin::Amount,
#[serde(with = "::bitcoin::util::amount::serde::as_sat")]
buy: bitcoin::Amount,
},
AmountAboveMaximum {
#[serde(with = "::bitcoin::util::amount::serde::as_sat")]
max: bitcoin::Amount,
#[serde(with = "::bitcoin::util::amount::serde::as_sat")]
buy: bitcoin::Amount,
},
BalanceTooLow {
#[serde(with = "::bitcoin::util::amount::serde::as_sat")]
buy: bitcoin::Amount,
},
BlockchainNetworkMismatch {
cli: BlockchainNetwork,
asb: BlockchainNetwork,
},
/// To be used for errors that cannot be explained on the CLI side (e.g.
/// rate update problems on the seller side)
Other,
}
// TODO: This is bob only.
// enum OutboundState {
// PendingOpen(
@ -298,8 +252,6 @@ pub enum HandlerOutEvent {
pub enum HandlerInEvent {}
pub const BUF_SIZE: usize = 1024 * 1024;
impl<LR> ProtocolsHandler for Handler<LR>
where
LR: LatestRate + Send + 'static,
@ -332,7 +284,7 @@ where
let env_config = self.env_config;
let protocol = tokio::time::timeout(self.timeout, async move {
let request = read_cbor_message::<SpotPriceRequest>(&mut substream)
let request = swap_setup::read_cbor_message::<SpotPriceRequest>(&mut substream)
.await
.map_err(|e| Error::Io(e))?;
let wallet_snapshot = sender
@ -392,14 +344,14 @@ where
let xmr = match validate.await {
Ok(xmr) => {
write_cbor_message(&mut substream, SpotPriceResponse::Xmr(xmr))
swap_setup::write_cbor_message(&mut substream, SpotPriceResponse::Xmr(xmr))
.await
.map_err(|e| Error::Io(e))?;
xmr
}
Err(e) => {
write_cbor_message(
swap_setup::write_cbor_message(
&mut substream,
SpotPriceResponse::Error(e.to_error_response()),
)
@ -420,17 +372,17 @@ where
&mut rand::thread_rng(),
);
let message0 = read_cbor_message::<Message0>(&mut substream)
let message0 = swap_setup::read_cbor_message::<Message0>(&mut substream)
.await
.context("Failed to deserialize message0")
.map_err(|e| Error::Io(e))?;
let (swap_id, state1) = state0.receive(message0).map_err(|e| Error::Io(e))?;
write_cbor_message(&mut substream, state1.next_message())
swap_setup::write_cbor_message(&mut substream, state1.next_message())
.await
.map_err(|e| Error::Io(e))?;
let message2 = read_cbor_message::<Message2>(&mut substream)
let message2 = swap_setup::read_cbor_message::<Message2>(&mut substream)
.await
.context("Failed to deserialize message2")
.map_err(|e| Error::Io(e))?;
@ -439,11 +391,11 @@ where
.context("Failed to receive Message2")
.map_err(|e| Error::Io(e))?;
write_cbor_message(&mut substream, state2.next_message())
swap_setup::write_cbor_message(&mut substream, state2.next_message())
.await
.map_err(|e| Error::Io(e))?;
let message4 = read_cbor_message::<Message4>(&mut substream)
let message4 = swap_setup::read_cbor_message::<Message4>(&mut substream)
.await
.context("Failed to deserialize message4")
.map_err(|e| Error::Io(e))?;
@ -509,27 +461,6 @@ where
}
}
async fn read_cbor_message<T>(substream: &mut NegotiatedSubstream) -> Result<T>
where
T: DeserializeOwned,
{
let bytes = upgrade::read_one(substream, BUF_SIZE).await?;
let mut de = serde_cbor::Deserializer::from_slice(&bytes);
let message = T::deserialize(&mut de)?;
Ok(message)
}
async fn write_cbor_message<T>(substream: &mut NegotiatedSubstream, message: T) -> Result<()>
where
T: Serialize,
{
let bytes = serde_cbor::to_vec(&message)?;
upgrade::write_one(substream, &bytes).await?;
Ok(())
}
// TODO: Differentiate between errors that we send back and shit that happens on
// our side (IO, timeout)
#[derive(Debug, thiserror::Error)]