mirror of
https://github.com/comit-network/xmr-btc-swap.git
synced 2024-10-01 01:45:40 -04:00
Provide stronger isolation of kraken module
Instead of leaking the tokio::sync:⌚:Receiver type in our
return value, we create a newtype that implements the desired
interface. This allows us to get rid of the `RateService` structs
and instead implement `LatestRate` directly on top of this struct.
Given that `LatestRate` is only used within the event_loop module,
we move the definition of this type into there.
This commit is contained in:
parent
f6ed4d65b5
commit
1822886cd0
@ -1,13 +1,7 @@
|
|||||||
mod amounts;
|
|
||||||
pub mod command;
|
pub mod command;
|
||||||
pub mod config;
|
pub mod config;
|
||||||
pub mod fixed_rate;
|
mod fixed_rate;
|
||||||
pub mod kraken;
|
mod rate;
|
||||||
|
|
||||||
pub use amounts::Rate;
|
pub use self::fixed_rate::FixedRate;
|
||||||
|
pub use self::rate::Rate;
|
||||||
pub trait LatestRate {
|
|
||||||
type Error: std::error::Error + Send + Sync + 'static;
|
|
||||||
|
|
||||||
fn latest_rate(&mut self) -> Result<Rate, Self::Error>;
|
|
||||||
}
|
|
||||||
|
@ -1,23 +1,20 @@
|
|||||||
use crate::asb::{LatestRate, Rate};
|
use crate::asb::Rate;
|
||||||
use std::convert::Infallible;
|
|
||||||
|
|
||||||
pub const RATE: f64 = 0.01;
|
#[derive(Clone, Debug)]
|
||||||
|
pub struct FixedRate(Rate);
|
||||||
|
|
||||||
#[derive(Clone)]
|
impl FixedRate {
|
||||||
pub struct RateService(Rate);
|
pub const RATE: f64 = 0.01;
|
||||||
|
|
||||||
impl LatestRate for RateService {
|
pub fn value(&self) -> Rate {
|
||||||
type Error = Infallible;
|
self.0
|
||||||
|
|
||||||
fn latest_rate(&mut self) -> Result<Rate, Infallible> {
|
|
||||||
Ok(self.0)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for RateService {
|
impl Default for FixedRate {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self(Rate {
|
Self(Rate {
|
||||||
ask: bitcoin::Amount::from_btc(RATE).expect("Static value should never fail"),
|
ask: bitcoin::Amount::from_btc(Self::RATE).expect("Static value should never fail"),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,25 +0,0 @@
|
|||||||
use crate::asb::{LatestRate, Rate};
|
|
||||||
use crate::kraken;
|
|
||||||
use anyhow::Result;
|
|
||||||
use tokio::sync::watch::Receiver;
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct RateService {
|
|
||||||
receiver: Receiver<Result<Rate, kraken::Error>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl LatestRate for RateService {
|
|
||||||
type Error = kraken::Error;
|
|
||||||
|
|
||||||
fn latest_rate(&mut self) -> Result<Rate, Self::Error> {
|
|
||||||
(*self.receiver.borrow()).clone()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl RateService {
|
|
||||||
pub async fn new() -> Result<Self> {
|
|
||||||
Ok(Self {
|
|
||||||
receiver: kraken::connect().await?,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
@ -23,7 +23,6 @@ use swap::asb::command::{Arguments, Command};
|
|||||||
use swap::asb::config::{
|
use swap::asb::config::{
|
||||||
initial_setup, query_user_for_initial_testnet_config, read_config, Config, ConfigNotInitialized,
|
initial_setup, query_user_for_initial_testnet_config, read_config, Config, ConfigNotInitialized,
|
||||||
};
|
};
|
||||||
use swap::asb::kraken;
|
|
||||||
use swap::database::Database;
|
use swap::database::Database;
|
||||||
use swap::execution_params::GetExecutionParams;
|
use swap::execution_params::GetExecutionParams;
|
||||||
use swap::fs::default_config_path;
|
use swap::fs::default_config_path;
|
||||||
@ -31,7 +30,7 @@ use swap::monero::Amount;
|
|||||||
use swap::protocol::alice::EventLoop;
|
use swap::protocol::alice::EventLoop;
|
||||||
use swap::seed::Seed;
|
use swap::seed::Seed;
|
||||||
use swap::trace::init_tracing;
|
use swap::trace::init_tracing;
|
||||||
use swap::{bitcoin, execution_params, monero};
|
use swap::{bitcoin, execution_params, kraken, monero};
|
||||||
use tracing::{info, warn};
|
use tracing::{info, warn};
|
||||||
use tracing_subscriber::filter::LevelFilter;
|
use tracing_subscriber::filter::LevelFilter;
|
||||||
|
|
||||||
@ -93,7 +92,7 @@ async fn main() -> Result<()> {
|
|||||||
bitcoin_wallet.new_address().await?
|
bitcoin_wallet.new_address().await?
|
||||||
);
|
);
|
||||||
|
|
||||||
let rate_service = kraken::RateService::new().await?;
|
let kraken_rate_updates = kraken::connect().await?;
|
||||||
|
|
||||||
let (event_loop, _) = EventLoop::new(
|
let (event_loop, _) = EventLoop::new(
|
||||||
config.network.listen,
|
config.network.listen,
|
||||||
@ -102,7 +101,7 @@ async fn main() -> Result<()> {
|
|||||||
Arc::new(bitcoin_wallet),
|
Arc::new(bitcoin_wallet),
|
||||||
Arc::new(monero_wallet),
|
Arc::new(monero_wallet),
|
||||||
Arc::new(db),
|
Arc::new(db),
|
||||||
rate_service,
|
kraken_rate_updates,
|
||||||
max_buy,
|
max_buy,
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
@ -11,9 +11,7 @@ async fn main() -> Result<()> {
|
|||||||
.context("Failed to connect to kraken")?;
|
.context("Failed to connect to kraken")?;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
ticker.changed().await?;
|
match ticker.wait_for_update().await? {
|
||||||
|
|
||||||
match &*ticker.borrow() {
|
|
||||||
Ok(rate) => println!("Rate update: {}", rate),
|
Ok(rate) => println!("Rate update: {}", rate),
|
||||||
Err(e) => println!("Error: {:#}", e),
|
Err(e) => println!("Error: {:#}", e),
|
||||||
}
|
}
|
||||||
|
@ -10,7 +10,7 @@ use tokio::sync::watch;
|
|||||||
use tokio_tungstenite::tungstenite;
|
use tokio_tungstenite::tungstenite;
|
||||||
use tracing::{error, trace};
|
use tracing::{error, trace};
|
||||||
|
|
||||||
pub async fn connect() -> Result<watch::Receiver<Result<Rate, Error>>> {
|
pub async fn connect() -> Result<RateUpdateStream> {
|
||||||
let (rate_update, rate_update_receiver) = watch::channel(Err(Error::NotYetRetrieved));
|
let (rate_update, rate_update_receiver) = watch::channel(Err(Error::NotYetRetrieved));
|
||||||
|
|
||||||
let (rate_stream, _response) =
|
let (rate_stream, _response) =
|
||||||
@ -88,7 +88,26 @@ pub async fn connect() -> Result<watch::Receiver<Result<Rate, Error>>> {
|
|||||||
.send(SUBSCRIBE_XMR_BTC_TICKER_PAYLOAD.into())
|
.send(SUBSCRIBE_XMR_BTC_TICKER_PAYLOAD.into())
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
Ok(rate_update_receiver)
|
Ok(RateUpdateStream {
|
||||||
|
inner: rate_update_receiver,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub struct RateUpdateStream {
|
||||||
|
inner: watch::Receiver<Result<Rate, Error>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RateUpdateStream {
|
||||||
|
pub async fn wait_for_update(&mut self) -> Result<Result<Rate, Error>> {
|
||||||
|
self.inner.changed().await?;
|
||||||
|
|
||||||
|
Ok(self.inner.borrow().clone())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn latest_update(&mut self) -> Result<Rate, Error> {
|
||||||
|
self.inner.borrow().clone()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const KRAKEN_WS_URL: &str = "wss://ws.kraken.com";
|
const KRAKEN_WS_URL: &str = "wss://ws.kraken.com";
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
use crate::asb::LatestRate;
|
use crate::asb::{FixedRate, Rate};
|
||||||
use crate::database::Database;
|
use crate::database::Database;
|
||||||
use crate::execution_params::ExecutionParams;
|
use crate::execution_params::ExecutionParams;
|
||||||
use crate::monero::BalanceTooLow;
|
use crate::monero::BalanceTooLow;
|
||||||
@ -8,13 +8,14 @@ use crate::protocol::alice;
|
|||||||
use crate::protocol::alice::{AliceState, Behaviour, OutEvent, State3, Swap, TransferProof};
|
use crate::protocol::alice::{AliceState, Behaviour, OutEvent, State3, Swap, TransferProof};
|
||||||
use crate::protocol::bob::EncryptedSignature;
|
use crate::protocol::bob::EncryptedSignature;
|
||||||
use crate::seed::Seed;
|
use crate::seed::Seed;
|
||||||
use crate::{bitcoin, monero};
|
use crate::{bitcoin, kraken, monero};
|
||||||
use anyhow::{bail, Context, Result};
|
use anyhow::{bail, Context, Result};
|
||||||
use futures::future::RemoteHandle;
|
use futures::future::RemoteHandle;
|
||||||
use libp2p::core::Multiaddr;
|
use libp2p::core::Multiaddr;
|
||||||
use libp2p::futures::FutureExt;
|
use libp2p::futures::FutureExt;
|
||||||
use libp2p::{PeerId, Swarm};
|
use libp2p::{PeerId, Swarm};
|
||||||
use rand::rngs::OsRng;
|
use rand::rngs::OsRng;
|
||||||
|
use std::convert::Infallible;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::mpsc::error::SendError;
|
use tokio::sync::mpsc::error::SendError;
|
||||||
use tokio::sync::{broadcast, mpsc};
|
use tokio::sync::{broadcast, mpsc};
|
||||||
@ -80,7 +81,7 @@ pub struct EventLoop<RS> {
|
|||||||
bitcoin_wallet: Arc<bitcoin::Wallet>,
|
bitcoin_wallet: Arc<bitcoin::Wallet>,
|
||||||
monero_wallet: Arc<monero::Wallet>,
|
monero_wallet: Arc<monero::Wallet>,
|
||||||
db: Arc<Database>,
|
db: Arc<Database>,
|
||||||
rate_service: RS,
|
latest_rate: RS,
|
||||||
max_buy: bitcoin::Amount,
|
max_buy: bitcoin::Amount,
|
||||||
|
|
||||||
recv_encrypted_signature: broadcast::Sender<EncryptedSignature>,
|
recv_encrypted_signature: broadcast::Sender<EncryptedSignature>,
|
||||||
@ -92,9 +93,31 @@ pub struct EventLoop<RS> {
|
|||||||
swap_handle_sender: mpsc::Sender<RemoteHandle<Result<AliceState>>>,
|
swap_handle_sender: mpsc::Sender<RemoteHandle<Result<AliceState>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<RS> EventLoop<RS>
|
pub trait LatestRate {
|
||||||
|
type Error: std::error::Error + Send + Sync + 'static;
|
||||||
|
|
||||||
|
fn latest_rate(&mut self) -> Result<Rate, Self::Error>;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LatestRate for FixedRate {
|
||||||
|
type Error = Infallible;
|
||||||
|
|
||||||
|
fn latest_rate(&mut self) -> Result<Rate, Self::Error> {
|
||||||
|
Ok(self.value())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LatestRate for kraken::RateUpdateStream {
|
||||||
|
type Error = kraken::Error;
|
||||||
|
|
||||||
|
fn latest_rate(&mut self) -> Result<Rate, Self::Error> {
|
||||||
|
self.latest_update()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<LR> EventLoop<LR>
|
||||||
where
|
where
|
||||||
RS: LatestRate,
|
LR: LatestRate,
|
||||||
{
|
{
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
pub fn new(
|
pub fn new(
|
||||||
@ -104,7 +127,7 @@ where
|
|||||||
bitcoin_wallet: Arc<bitcoin::Wallet>,
|
bitcoin_wallet: Arc<bitcoin::Wallet>,
|
||||||
monero_wallet: Arc<monero::Wallet>,
|
monero_wallet: Arc<monero::Wallet>,
|
||||||
db: Arc<Database>,
|
db: Arc<Database>,
|
||||||
rate_service: RS,
|
latest_rate: LR,
|
||||||
max_buy: bitcoin::Amount,
|
max_buy: bitcoin::Amount,
|
||||||
) -> Result<(Self, mpsc::Receiver<RemoteHandle<Result<AliceState>>>)> {
|
) -> Result<(Self, mpsc::Receiver<RemoteHandle<Result<AliceState>>>)> {
|
||||||
let identity = seed.derive_libp2p_identity();
|
let identity = seed.derive_libp2p_identity();
|
||||||
@ -132,7 +155,7 @@ where
|
|||||||
bitcoin_wallet,
|
bitcoin_wallet,
|
||||||
monero_wallet,
|
monero_wallet,
|
||||||
db,
|
db,
|
||||||
rate_service,
|
latest_rate,
|
||||||
recv_encrypted_signature: recv_encrypted_signature.sender,
|
recv_encrypted_signature: recv_encrypted_signature.sender,
|
||||||
send_transfer_proof: send_transfer_proof.receiver,
|
send_transfer_proof: send_transfer_proof.receiver,
|
||||||
send_transfer_proof_sender: send_transfer_proof.sender,
|
send_transfer_proof_sender: send_transfer_proof.sender,
|
||||||
@ -239,7 +262,7 @@ where
|
|||||||
monero_wallet: Arc<monero::Wallet>,
|
monero_wallet: Arc<monero::Wallet>,
|
||||||
) -> Result<monero::Amount> {
|
) -> Result<monero::Amount> {
|
||||||
let rate = self
|
let rate = self
|
||||||
.rate_service
|
.latest_rate
|
||||||
.latest_rate()
|
.latest_rate()
|
||||||
.context("Failed to get latest rate")?;
|
.context("Failed to get latest rate")?;
|
||||||
|
|
||||||
@ -265,7 +288,7 @@ where
|
|||||||
|
|
||||||
async fn make_quote(&mut self, max_buy: bitcoin::Amount) -> Result<BidQuote> {
|
async fn make_quote(&mut self, max_buy: bitcoin::Amount) -> Result<BidQuote> {
|
||||||
let rate = self
|
let rate = self
|
||||||
.rate_service
|
.latest_rate
|
||||||
.latest_rate()
|
.latest_rate()
|
||||||
.context("Failed to get latest rate")?;
|
.context("Failed to get latest rate")?;
|
||||||
|
|
||||||
|
@ -14,8 +14,7 @@ use std::convert::Infallible;
|
|||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use swap::asb::fixed_rate;
|
use swap::asb::FixedRate;
|
||||||
use swap::asb::fixed_rate::RATE;
|
|
||||||
use swap::bitcoin::{CancelTimelock, PunishTimelock};
|
use swap::bitcoin::{CancelTimelock, PunishTimelock};
|
||||||
use swap::database::Database;
|
use swap::database::Database;
|
||||||
use swap::execution_params::{ExecutionParams, GetExecutionParams};
|
use swap::execution_params::{ExecutionParams, GetExecutionParams};
|
||||||
@ -344,7 +343,7 @@ where
|
|||||||
let (monero, containers) = testutils::init_containers(&cli).await;
|
let (monero, containers) = testutils::init_containers(&cli).await;
|
||||||
|
|
||||||
let btc_amount = bitcoin::Amount::from_sat(1_000_000);
|
let btc_amount = bitcoin::Amount::from_sat(1_000_000);
|
||||||
let xmr_amount = monero::Amount::from_monero(btc_amount.as_btc() / RATE).unwrap();
|
let xmr_amount = monero::Amount::from_monero(btc_amount.as_btc() / FixedRate::RATE).unwrap();
|
||||||
|
|
||||||
let alice_starting_balances = StartingBalances {
|
let alice_starting_balances = StartingBalances {
|
||||||
xmr: xmr_amount * 10,
|
xmr: xmr_amount * 10,
|
||||||
@ -410,7 +409,7 @@ where
|
|||||||
alice_bitcoin_wallet.clone(),
|
alice_bitcoin_wallet.clone(),
|
||||||
alice_monero_wallet.clone(),
|
alice_monero_wallet.clone(),
|
||||||
alice_db,
|
alice_db,
|
||||||
fixed_rate::RateService::default(),
|
FixedRate::default(),
|
||||||
bitcoin::Amount::ONE_BTC,
|
bitcoin::Amount::ONE_BTC,
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
Loading…
Reference in New Issue
Block a user