This commit is contained in:
Thomas Eizinger 2021-06-24 15:43:01 +10:00
parent 144145fc06
commit a3e6f75eb1
No known key found for this signature in database
GPG Key ID: 651AC83A6C6C8B96
7 changed files with 339 additions and 304 deletions

View File

@ -81,6 +81,9 @@ pub struct PublicViewKey(PublicKey);
#[derive(Debug, Copy, Clone, Deserialize, Serialize, PartialEq, PartialOrd)] #[derive(Debug, Copy, Clone, Deserialize, Serialize, PartialEq, PartialOrd)]
pub struct Amount(u64); pub struct Amount(u64);
// Median tx fees on Monero as found here: https://www.monero.how/monero-transaction-fees, XMR 0.000_015 * 2 (to be on the safe side)
pub const MONERO_FEE: Amount = Amount::from_piconero(30000000);
impl Amount { impl Amount {
pub const ZERO: Self = Self(0); pub const ZERO: Self = Self(0);
pub const ONE_XMR: Self = Self(PICONERO_OFFSET); pub const ONE_XMR: Self = Self(PICONERO_OFFSET);
@ -88,7 +91,7 @@ impl Amount {
/// piconeros. /// piconeros.
/// ///
/// A piconero (a.k.a atomic unit) is equal to 1e-12 XMR. /// A piconero (a.k.a atomic unit) is equal to 1e-12 XMR.
pub fn from_piconero(amount: u64) -> Self { pub const fn from_piconero(amount: u64) -> Self {
Amount(amount) Amount(amount)
} }

View File

@ -276,11 +276,6 @@ impl Wallet {
pub async fn refresh(&self) -> Result<Refreshed> { pub async fn refresh(&self) -> Result<Refreshed> {
Ok(self.inner.lock().await.refresh().await?) Ok(self.inner.lock().await.refresh().await?)
} }
pub fn static_tx_fee_estimate(&self) -> Amount {
// Median tx fees on Monero as found here: https://www.monero.how/monero-transaction-fees, 0.000_015 * 2 (to be on the safe side)
Amount::from_monero(0.000_03f64).expect("static fee to be convertible without problems")
}
} }
#[derive(Debug)] #[derive(Debug)]

View File

@ -1,19 +1,21 @@
//! Run an XMR/BTC swap in the role of Alice. //! Run an XMR/BTC swap in the role of Alice.
//! Alice holds XMR and wishes receive BTC. //! Alice holds XMR and wishes receive BTC.
use std::sync::Arc;
use uuid::Uuid;
use crate::{bitcoin, monero};
use crate::database::Database; use crate::database::Database;
use crate::env::Config; use crate::env::Config;
use crate::{bitcoin, monero};
use std::sync::Arc;
use uuid::Uuid;
pub use self::behaviour::{Behaviour, OutEvent}; pub use self::behaviour::{Behaviour, OutEvent};
pub use self::event_loop::{EventLoop, EventLoopHandle}; pub use self::event_loop::{EventLoop, EventLoopHandle};
pub use self::recovery::{cancel, punish, redeem, refund, safely_abort};
pub use self::recovery::cancel::cancel; pub use self::recovery::cancel::cancel;
pub use self::recovery::punish::punish; pub use self::recovery::punish::punish;
pub use self::recovery::redeem::redeem; pub use self::recovery::redeem::redeem;
pub use self::recovery::refund::refund; pub use self::recovery::refund::refund;
pub use self::recovery::safely_abort::safely_abort; pub use self::recovery::safely_abort::safely_abort;
pub use self::recovery::{cancel, punish, redeem, refund, safely_abort};
pub use self::state::*; pub use self::state::*;
pub use self::swap::{run, run_until}; pub use self::swap::{run, run_until};
@ -21,9 +23,9 @@ mod behaviour;
pub mod event_loop; pub mod event_loop;
mod execution_setup; mod execution_setup;
mod recovery; mod recovery;
mod swap_setup;
pub mod state; pub mod state;
pub mod swap; pub mod swap;
pub mod swap_setup;
pub struct Swap { pub struct Swap {
pub state: AliceState, pub state: AliceState,

View File

@ -1,24 +1,25 @@
use crate::network::quote::BidQuote;
use crate::network::{encrypted_signature, quote, transfer_proof};
use crate::protocol::alice::event_loop::LatestRate;
use crate::protocol::alice::{execution_setup, swap_setup, State3};
use crate::{env, monero};
use anyhow::{anyhow, Error}; use anyhow::{anyhow, Error};
use libp2p::{NetworkBehaviour, PeerId};
use libp2p::ping::{Ping, PingEvent}; use libp2p::ping::{Ping, PingEvent};
use libp2p::request_response::{RequestId, ResponseChannel}; use libp2p::request_response::{RequestId, ResponseChannel};
use libp2p::{NetworkBehaviour, PeerId};
use uuid::Uuid; use uuid::Uuid;
use crate::{env, monero};
use crate::network::{encrypted_signature, quote, transfer_proof};
use crate::network::quote::BidQuote;
use crate::protocol::alice::{execution_setup, State3, swap_setup};
use crate::protocol::alice::event_loop::LatestRate;
use crate::protocol::alice::swap_setup::WalletSnapshot;
use tokio::sync::oneshot;
#[derive(Debug)] #[derive(Debug)]
pub enum OutEvent { pub enum OutEvent {
SwapRequestDeclined { SwapRequestDeclined {
peer: PeerId, peer: PeerId,
error: swap_setup::Error, error: swap_setup::Error,
}, },
ExecutionSetupStart { SwapInitiated {
peer: PeerId, send_wallet_snapshot: oneshot::Sender<WalletSnapshot>
btc: bitcoin::Amount,
xmr: monero::Amount,
}, },
QuoteRequested { QuoteRequested {
channel: ResponseChannel<BidQuote>, channel: ResponseChannel<BidQuote>,

View File

@ -20,6 +20,7 @@ use std::fmt::Debug;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use uuid::Uuid; use uuid::Uuid;
use crate::protocol::alice::swap_setup::WalletSnapshot;
/// A future that resolves to a tuple of `PeerId`, `transfer_proof::Request` and /// A future that resolves to a tuple of `PeerId`, `transfer_proof::Request` and
/// `Responder`. /// `Responder`.
@ -150,61 +151,38 @@ where
tokio::select! { tokio::select! {
swarm_event = self.swarm.next_event() => { swarm_event = self.swarm.next_event() => {
match swarm_event { match swarm_event {
SwarmEvent::Behaviour(OutEvent::ExecutionSetupStart { peer, btc, xmr }) => { SwarmEvent::Behaviour(OutEvent::SwapInitiated { send_wallet_snapshot }) => {
let tx_redeem_fee = self.bitcoin_wallet let wallet_snapshot = match WalletSnapshot::capture(&self.bitcoin_wallet, &self.monero_wallet).await {
.estimate_fee(bitcoin::TxRedeem::weight(), btc) Ok(wallet_snapshot) => wallet_snapshot,
.await;
let tx_punish_fee = self.bitcoin_wallet
.estimate_fee(bitcoin::TxPunish::weight(), btc)
.await;
let redeem_address = self.bitcoin_wallet.new_address().await;
let punish_address = self.bitcoin_wallet.new_address().await;
let (redeem_address, punish_address) = match (
redeem_address,
punish_address,
) {
(Ok(redeem_address), Ok(punish_address)) => {
(redeem_address, punish_address)
}
_ => {
tracing::error!(%peer, "Failed to get new address during execution setup");
continue;
}
};
let (tx_redeem_fee, tx_punish_fee) = match (
tx_redeem_fee,
tx_punish_fee,
) {
(Ok(tx_redeem_fee), Ok(tx_punish_fee)) => {
(tx_redeem_fee, tx_punish_fee)
}
_ => {
tracing::error!(%peer, "Failed to calculate transaction fees during execution setup");
continue;
}
};
let state0 = match State0::new(
btc,
xmr,
self.env_config,
redeem_address,
punish_address,
tx_redeem_fee,
tx_punish_fee,
&mut OsRng
) {
Ok(state) => state,
Err(error) => { Err(error) => {
tracing::warn!(%peer, "Failed to make State0 for execution setup. Error {:#}", error); tracing::error!("Swap request will be ignored because we were unable to create wallet snapshot for swap: {:#}", error)
continue;
} }
}; };
self.swarm.behaviour_mut().execution_setup.run(peer, state0); match send_wallet_snapshot.send().await {
Ok()
}
// TODO: Move into execution setup as part of swap_setup
// let state0 = match State0::new(
// btc,
// xmr,
// self.env_config,
// redeem_address,
// punish_address,
// tx_redeem_fee,
// tx_punish_fee,
// &mut OsRng
// ) {
// Ok(state) => state,
// Err(error) => {
// tracing::warn!(%peer, "Failed to make State0 for execution setup. Error {:#}", error);
// continue;
// }
// };
//
// self.swarm.behaviour_mut().execution_setup.run(peer, state0);
} }
SwarmEvent::Behaviour(OutEvent::SwapRequestDeclined { peer, error }) => { SwarmEvent::Behaviour(OutEvent::SwapRequestDeclined { peer, error }) => {
tracing::warn!(%peer, "Ignoring spot price request because: {}", error); tracing::warn!(%peer, "Ignoring spot price request because: {}", error);

View File

@ -52,36 +52,7 @@ impl Behaviour {
pub fn run(&mut self, bob: PeerId, state0: State0) { pub fn run(&mut self, bob: PeerId, state0: State0) {
self.inner.do_protocol_listener(bob, move |mut substream| { self.inner.do_protocol_listener(bob, move |mut substream| {
let protocol = async move { let protocol = async move {
let message0 =
serde_cbor::from_slice::<Message0>(&substream.read_message(BUF_SIZE).await?)
.context("Failed to deserialize message0")?;
let (swap_id, state1) = state0.receive(message0)?;
substream
.write_message(
&serde_cbor::to_vec(&state1.next_message())
.context("Failed to serialize message1")?,
)
.await?;
let message2 =
serde_cbor::from_slice::<Message2>(&substream.read_message(BUF_SIZE).await?)
.context("Failed to deserialize message2")?;
let state2 = state1
.receive(message2)
.context("Failed to receive Message2")?;
substream
.write_message(
&serde_cbor::to_vec(&state2.next_message())
.context("Failed to serialize message3")?,
)
.await?;
let message4 =
serde_cbor::from_slice::<Message4>(&substream.read_message(BUF_SIZE).await?)
.context("Failed to deserialize message4")?;
let state3 = state2.receive(message4)?;
Ok((bob, (swap_id, state3))) Ok((bob, (swap_id, state3)))
}; };

View File

@ -1,33 +1,75 @@
use crate::network::cbor_request_response::CborCodec; use anyhow::{Result, Context};
use crate::network::spot_price; use std::collections::VecDeque;
use crate::network::spot_price::{BlockchainNetwork, SpotPriceProtocol}; use std::fmt::Debug;
use crate::protocol::alice; use std::future;
use crate::protocol::alice::event_loop::LatestRate; use std::task::{Context, Poll};
use crate::{env, monero};
use futures::future::{BoxFuture, OptionFuture};
use libp2p::{Multiaddr, NetworkBehaviour, PeerId};
use libp2p::core::connection::ConnectionId;
use libp2p::core::{Endpoint, upgrade};
use libp2p::core::upgrade::from_fn;
use libp2p::core::upgrade::FromFnUpgrade;
use libp2p::request_response::{ use libp2p::request_response::{
ProtocolSupport, RequestResponseConfig, RequestResponseEvent, RequestResponseMessage, ProtocolSupport, RequestResponseConfig, RequestResponseEvent, RequestResponseMessage,
ResponseChannel, ResponseChannel,
}; };
use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters, NetworkBehaviour, IntoProtocolsHandler, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, KeepAlive, SubstreamProtocol, NegotiatedSubstream}; use libp2p::swarm::{IntoProtocolsHandler, KeepAlive, NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol};
use libp2p::{NetworkBehaviour, PeerId, Multiaddr}; use libp2p::swarm::protocols_handler::{InboundUpgradeSend, OutboundUpgradeSend};
use std::collections::VecDeque; use uuid::Uuid;
use std::fmt::Debug; use void::Void;
use std::task::{Context, Poll};
use libp2p::core::connection::ConnectionId; use crate::{env, monero};
use libp2p::swarm::protocols_handler::{OutboundUpgradeSend, InboundUpgradeSend}; use crate::network::cbor_request_response::CborCodec;
use libp2p::core:: use crate::network::spot_price;
use crate::network::spot_price::{BlockchainNetwork, SpotPriceProtocol};
use crate::protocol::{alice, bob, Message0, Message2, Message4};
use crate::protocol::alice::event_loop::LatestRate;
use crate::protocol::alice::{State3, State0};
use futures::FutureExt;
use tokio::sync::oneshot;
use serde::{Deserialize, Serialize};
#[derive(Debug)] #[derive(Debug)]
pub enum OutEvent { pub enum OutEvent {
ExecutionSetupParams { Initiated {
peer: PeerId, send_wallet_snapshot: oneshot::Sender<WalletSnapshot>
btc: bitcoin::Amount,
xmr: monero::Amount,
}, },
Error { Completed {
peer: PeerId, bob_peer_id: PeerId,
error: Error, swap_id: Uuid,
state3: State3,
}, },
Error, // TODO be more descriptive
}
pub struct WalletSnapshot {
balance: monero::Amount,
lock_fee: monero::Amount,
// TODO: Consider using the same address for punish and redeem (they are mutually exclusive, so effectively the address will only be used once)
redeem_address: bitcoin::Address,
punish_address: bitcoin::Address,
redeem_fee: bitcoin::Amount,
refund_fee: bitcoin::Amount,
}
impl WalletSnapshot {
pub async fn capture(bitcoin_wallet: &bitcoin::Wallet, monero_wallet: &monero::Wallet) -> Result<Self> {
Ok(Self {
balance: monero_wallet.get_balance().await?,
lock_fee: monero::MONERO_FEE,
redeem_address: bitcoin_wallet.new_address().await?,
punish_address: bitcoin_wallet.new_address().await?,
redeem_fee: bitcoin_wallet
.estimate_fee(bitcoin::TxRedeem::weight(), btc)
.await,
refund_fee: bitcoin_wallet
.estimate_fee(bitcoin::TxPunish::weight(), btc)
.await
})
}
} }
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
@ -36,24 +78,14 @@ where
LR: LatestRate + Send + 'static, LR: LatestRate + Send + 'static,
{ {
events: VecDeque<OutEvent>, events: VecDeque<OutEvent>,
balance: monero::Amount,
lock_fee: monero::Amount,
min_buy: bitcoin::Amount, min_buy: bitcoin::Amount,
max_buy: bitcoin::Amount, max_buy: bitcoin::Amount,
env_config: env::Config, env_config: env::Config,
unused_addresses: VecDeque<bitcoin::Address>,
redeem_fee: bitcoin::Amount,
refund_fee: bitcoin::Amount,
latest_rate: LR, latest_rate: LR,
resume_only: bool, resume_only: bool,
} }
/// Behaviour that handles spot prices.
/// All the logic how to react to a spot price request is contained here, events
/// reporting the successful handling of a spot price request or a failure are
/// bubbled up to the parent behaviour.
impl<LR> Behaviour<LR> impl<LR> Behaviour<LR>
where where
LR: LatestRate + Send + 'static, LR: LatestRate + Send + 'static,
@ -79,37 +111,17 @@ where
} }
} }
pub fn update_balance(&mut self, balance: monero::Amount) { pub fn update(&mut self, monero_balance: monero::Amount, redeem_address: bitcoin::Address, punish_address: bitcoin::Address) {
self.balance = balance; self.balance = monero_balance;
}
fn decline(
&mut self,
peer: PeerId,
channel: ResponseChannel<spot_price::Response>,
error: Error,
) {
if self
.behaviour
.send_response(
channel,
spot_price::Response::Error(error.to_error_response()),
)
.is_err()
{
tracing::debug!(%peer, "Unable to send error response for spot price request");
}
self.events.push_back(OutEvent::Error { peer, error });
} }
} }
impl<LR> NetworkBehaviour for Behaviour<LR> { impl<LR> NetworkBehaviour for Behaviour<LR> {
type ProtocolsHandler = (); type ProtocolsHandler = Handler;
type OutEvent = (); type OutEvent = OutEvent;
fn new_handler(&mut self) -> Self::ProtocolsHandler { fn new_handler(&mut self) -> Self::ProtocolsHandler {
todo!() Handler::default()
} }
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> { fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
@ -133,33 +145,189 @@ impl<LR> NetworkBehaviour for Behaviour<LR> {
} }
} }
enum InboundState { #[derive(Serialize, Deserialize, Debug, Clone)]
PendingBehaviour(NegotiatedSubstream) pub struct SpotPriceRequest {
#[serde(with = "::bitcoin::util::amount::serde::as_sat")]
pub btc: bitcoin::Amount,
pub blockchain_network: BlockchainNetwork,
} }
struct Handler { #[derive(Clone, Debug, Serialize, Deserialize)]
inbound_stream: Option<InboundState> pub enum SpotPriceError {
NoSwapsAccepted,
AmountBelowMinimum {
#[serde(with = "::bitcoin::util::amount::serde::as_sat")]
min: bitcoin::Amount,
#[serde(with = "::bitcoin::util::amount::serde::as_sat")]
buy: bitcoin::Amount,
},
AmountAboveMaximum {
#[serde(with = "::bitcoin::util::amount::serde::as_sat")]
max: bitcoin::Amount,
#[serde(with = "::bitcoin::util::amount::serde::as_sat")]
buy: bitcoin::Amount,
},
BalanceTooLow {
#[serde(with = "::bitcoin::util::amount::serde::as_sat")]
buy: bitcoin::Amount,
},
BlockchainNetworkMismatch {
cli: BlockchainNetwork,
asb: BlockchainNetwork,
},
/// To be used for errors that cannot be explained on the CLI side (e.g.
/// rate update problems on the seller side)
Other,
} }
// TODO: This is bob only.
// enum OutboundState {
// PendingOpen(
// // TODO: put data in here we pass in when we want to kick of swap setup, just bitcoin amount?
// ),
// PendingNegotiate,
// Executing(BoxFuture<'static, anyhow::Result<(Uuid, bob::State3)>>)
// }
// TODO: Don't just use anyhow::Error
type InboundStream = BoxFuture<'static, anyhow::Result<(Uuid, alice::State3)>>;
struct Handler {
inbound_stream: OptionFuture<InboundStream>,
events: VecDeque<HandlerOutEvent>,
resume_only: bool
}
enum HandlerOutEvent {
Initiated(oneshot::Sender<WalletSnapshot>),
Completed(anyhow::Result<(Uuid, alice::State3)>)
}
pub const BUF_SIZE: usize = 1024 * 1024;
impl ProtocolsHandler for Handler { impl ProtocolsHandler for Handler {
type InEvent = (); type InEvent = ();
type OutEvent = (); type OutEvent = HandlerOutEvent;
type Error = (); type Error = ();
type InboundProtocol = (); type InboundProtocol = protocol::SwapSetup;
type OutboundProtocol = (); type OutboundProtocol = ();
type InboundOpenInfo = (); type InboundOpenInfo = ();
type OutboundOpenInfo = (); type OutboundOpenInfo = ();
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> { fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
todo!() SubstreamProtocol::new(protocol::new(), todo!("pass data down to handler"))
} }
fn inject_fully_negotiated_inbound(&mut self, protocol: NegotiatedSubstream, _: Self::InboundOpenInfo) { fn inject_fully_negotiated_inbound(&mut self, mut protocol: NegotiatedSubstream, _: Self::InboundOpenInfo) {
self.inbound_stream = Some(InboundState::PendingBehaviour(protocol)); let (sender, receiver) = oneshot::channel();
let resume_only = self.resume_only;
self.inbound_stream = OptionFuture::from(Some(async move {
let request = read_cbor_message::<SpotPriceRequest>(&mut protocol).await?;
let wallet_snapshot = receiver.await?; // TODO Put a timeout on this
async {
if resume_only {
return Err(Error::ResumeOnlyMode)
};
}
let blockchain_network = BlockchainNetwork {
bitcoin: self.env_config.bitcoin_network,
monero: self.env_config.monero_network,
};
if request.blockchain_network != blockchain_network {
self.decline(peer, channel, Error::BlockchainNetworkMismatch {
cli: request.blockchain_network,
asb: blockchain_network,
});
return;
}
let btc = request.btc;
if btc < self.min_buy {
self.decline(peer, channel, Error::AmountBelowMinimum {
min: self.min_buy,
buy: btc,
});
return;
}
if btc > self.max_buy {
self.decline(peer, channel, Error::AmountAboveMaximum {
max: self.max_buy,
buy: btc,
});
return;
}
let rate = match self.latest_rate.latest_rate() {
Ok(rate) => rate,
Err(e) => {
self.decline(peer, channel, Error::LatestRateFetchFailed(Box::new(e)));
return;
}
};
let xmr = match rate.sell_quote(btc) {
Ok(xmr) => xmr,
Err(e) => {
self.decline(peer, channel, Error::SellQuoteCalculationFailed(e));
return;
}
};
let xmr_balance = self.balance;
let xmr_lock_fees = self.lock_fee;
if xmr_balance < xmr + xmr_lock_fees {
self.decline(peer, channel, Error::BalanceTooLow {
balance: xmr_balance,
buy: btc,
});
return;
}
if self
.behaviour
.send_response(channel, spot_price::Response::Xmr(xmr))
.is_err()
{
tracing::error!(%peer, "Failed to send spot price response of {} for {}", xmr, btc)
}
let state0 = State0::new(spot_price_request.btc, todo!(), todo!(), todo!(), todo!(), todo!(), todo!(), todo!())?;
let message0 = read_cbor_message::<Message0>(&mut protocol).context("Failed to deserialize message0")?;
let (swap_id, state1) = state0.receive(message0)?;
write_cbor_message(&mut protocol, state1.next_message()).await?;
let message2 = read_cbor_message::<Message2>(&mut protocol).context("Failed to deserialize message2")?;
let state2 = state1
.receive(message2)
.context("Failed to receive Message2")?;
write_cbor_message(&mut protocol, state2.next_message()).await?;
let message4 = read_cbor_message::<Message4>(&mut protocol).context("Failed to deserialize message4")?;
let state3 = state2
.receive(message4)
.context("Failed to receive Message4")?;
Ok((swap_id, state3))
}.boxed()));
self.events.push_back(HandlerOutEvent::Initiated(sender));
} }
fn inject_fully_negotiated_outbound(&mut self, protocol: _, info: Self::OutboundOpenInfo) { fn inject_fully_negotiated_outbound(&mut self, protocol: NegotiatedSubstream, info: Self::OutboundOpenInfo) {
todo!() unreachable!("we don't support outbound")
} }
fn inject_event(&mut self, event: Self::InEvent) { fn inject_event(&mut self, event: Self::InEvent) {
@ -175,145 +343,60 @@ impl ProtocolsHandler for Handler {
} }
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent, Self::Error>> { fn poll(&mut self, cx: &mut Context<'_>) -> Poll<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent, Self::Error>> {
todo!() let event = futures::ready!(self.inbound_stream.poll(cx));
Poll::Ready(ProtocolsHandlerEvent::Custom(HandlerOutEvent::Completed(event)))
} }
} }
pub fn new() -> SwapSetup { async fn read_cbor_message<T>(substream: &mut NegotiatedSubstream) -> Result<T> where T: Deserialize {
from_fn( let bytes = upgrade::read_one(substream, BUF_SIZE).await?;
b"/rendezvous/1.0.0", let mut de = serde_cbor::Deserializer::from_slice(&bytes);
Box::new(|socket, _| future::ready(Ok(socket))), let message = T::deserialize(de)?;
)
Ok(message)
} }
pub type SwapSetup = FromFnUpgrade< async fn write_cbor_message<T>(substream: &mut NegotiatedSubstream, message: T) -> Result<()> where T: Serialize {
&'static [u8], let bytes = serde_cbor::to_vec(&message)?;
Box< upgrade::write_one(substream, &bytes).await?;
dyn Fn(
NegotiatedSubstream, Ok(())
Endpoint, }
async fn write_error_message(substream: &mut NegotiatedSubstream, message: impl Into<SpotPriceError>) -> Result<()> {
let bytes = serde_cbor::to_vec(&message.into())?;
upgrade::write_one(substream, &bytes).await?;
Ok(())
}
mod protocol {
use super::*;
pub fn new() -> SwapSetup {
from_fn(
b"/comit/xmr/btc/swap_setup/1.0.0",
Box::new(|socket, endpoint| future::ready(match endpoint {
Endpoint::Listener => Ok(socket),
Endpoint::Dialer => todo!("return error")
})),
) )
-> future::Ready<Result<NegotiatedSubstream, Void>>
+ Send
+ 'static,
>,
>;
impl<LR> NetworkBehaviourEventProcess<spot_price::OutEvent> for Behaviour<LR>
where
LR: LatestRate + Send + 'static,
{
fn inject_event(&mut self, event: spot_price::OutEvent) {
let (peer, message) = match event {
RequestResponseEvent::Message { peer, message } => (peer, message),
RequestResponseEvent::OutboundFailure { peer, error, .. } => {
tracing::error!(%peer, "Failure sending spot price response: {:#}", error);
return;
}
RequestResponseEvent::InboundFailure { peer, error, .. } => {
tracing::warn!(%peer, "Inbound failure when handling spot price request: {:#}", error);
return;
}
RequestResponseEvent::ResponseSent { peer, .. } => {
tracing::debug!(%peer, "Spot price response sent");
return;
}
};
let (request, channel) = match message {
RequestResponseMessage::Request {
request, channel, ..
} => (request, channel),
RequestResponseMessage::Response { .. } => {
tracing::error!("Unexpected message");
return;
}
};
let blockchain_network = BlockchainNetwork {
bitcoin: self.env_config.bitcoin_network,
monero: self.env_config.monero_network,
};
if request.blockchain_network != blockchain_network {
self.decline(peer, channel, Error::BlockchainNetworkMismatch {
cli: request.blockchain_network,
asb: blockchain_network,
});
return;
}
if self.resume_only {
self.decline(peer, channel, Error::ResumeOnlyMode);
return;
}
let btc = request.btc;
if btc < self.min_buy {
self.decline(peer, channel, Error::AmountBelowMinimum {
min: self.min_buy,
buy: btc,
});
return;
}
if btc > self.max_buy {
self.decline(peer, channel, Error::AmountAboveMaximum {
max: self.max_buy,
buy: btc,
});
return;
}
let rate = match self.latest_rate.latest_rate() {
Ok(rate) => rate,
Err(e) => {
self.decline(peer, channel, Error::LatestRateFetchFailed(Box::new(e)));
return;
}
};
let xmr = match rate.sell_quote(btc) {
Ok(xmr) => xmr,
Err(e) => {
self.decline(peer, channel, Error::SellQuoteCalculationFailed(e));
return;
}
};
let xmr_balance = self.balance;
let xmr_lock_fees = self.lock_fee;
if xmr_balance < xmr + xmr_lock_fees {
self.decline(peer, channel, Error::BalanceTooLow {
balance: xmr_balance,
buy: btc,
});
return;
}
if self
.behaviour
.send_response(channel, spot_price::Response::Xmr(xmr))
.is_err()
{
tracing::error!(%peer, "Failed to send spot price response of {} for {}", xmr, btc)
}
self.events
.push_back(OutEvent::ExecutionSetupParams { peer, btc, xmr });
} }
}
impl From<OutEvent> for alice::OutEvent { pub type SwapSetup = FromFnUpgrade<
fn from(event: OutEvent) -> Self { &'static [u8],
match event { Box<
OutEvent::ExecutionSetupParams { peer, btc, xmr } => { dyn Fn(
Self::ExecutionSetupStart { peer, btc, xmr } NegotiatedSubstream,
} Endpoint,
OutEvent::Error { peer, error } => Self::SwapRequestDeclined { peer, error }, )
} -> future::Ready<Result<NegotiatedSubstream, Void>>
} + Send
+ 'static,
>,
>;
} }
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
@ -374,16 +457,18 @@ impl Error {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*;
use crate::asb::Rate;
use crate::env::GetConfig;
use crate::monero;
use crate::network::test::{await_events_or_timeout, connect, new_swarm};
use crate::protocol::{alice, bob};
use anyhow::anyhow; use anyhow::anyhow;
use libp2p::Swarm; use libp2p::Swarm;
use rust_decimal::Decimal; use rust_decimal::Decimal;
use crate::{monero, network};
use crate::asb::Rate;
use crate::env::GetConfig;
use crate::network::test::{await_events_or_timeout, connect, new_swarm};
use crate::protocol::{alice, bob};
use super::*;
impl Default for AliceBehaviourValues { impl Default for AliceBehaviourValues {
fn default() -> Self { fn default() -> Self {
Self { Self {