mirror of
https://github.com/comit-network/xmr-btc-swap.git
synced 2024-10-01 01:45:40 -04:00
Quote websocket
Instead of just sending a price we properly create a bid quote analogue to the libp2p quote protocol. The service websocket sends `BidQuote`s or an Error. The `BidQuote` sent reuses the struct from the libp2p quote protocol.
This commit is contained in:
parent
b8f689bda0
commit
dd624b1df9
@ -1,6 +1,6 @@
|
|||||||
pub mod command;
|
pub mod command;
|
||||||
pub mod config;
|
pub mod config;
|
||||||
pub mod price_websocket;
|
pub mod quote_websocket;
|
||||||
mod rate;
|
mod rate;
|
||||||
pub mod tracing;
|
pub mod tracing;
|
||||||
|
|
||||||
|
@ -1,28 +0,0 @@
|
|||||||
use crate::kraken::PriceUpdates;
|
|
||||||
use anyhow::Context;
|
|
||||||
use futures::{stream, StreamExt, TryStreamExt};
|
|
||||||
use warp::ws::{Message, WebSocket};
|
|
||||||
|
|
||||||
pub async fn latest_rate(ws: WebSocket, subscription: PriceUpdates) {
|
|
||||||
let stream = stream::try_unfold(subscription.inner, |mut receiver| async move {
|
|
||||||
// todo print error message but don't forward it to the user and don't panic
|
|
||||||
receiver
|
|
||||||
.changed()
|
|
||||||
.await
|
|
||||||
.context("failed to receive latest rate update")
|
|
||||||
.expect("Should not fail :)");
|
|
||||||
|
|
||||||
let latest_rate = receiver.borrow().clone().expect("Should work");
|
|
||||||
|
|
||||||
// TODO: Proper definition of what to send over the wire
|
|
||||||
// TODO: Properly calculate the actual rate (using spread) and add min and max
|
|
||||||
// amounts tradeable
|
|
||||||
let msg = Message::text(serde_json::to_string(&latest_rate).expect("to serialize"));
|
|
||||||
|
|
||||||
Ok(Some((msg, receiver)))
|
|
||||||
})
|
|
||||||
.into_stream();
|
|
||||||
|
|
||||||
let (ws_tx, mut _ws_rx) = ws.split();
|
|
||||||
tokio::task::spawn(stream.forward(ws_tx));
|
|
||||||
}
|
|
88
swap/src/asb/quote_websocket.rs
Normal file
88
swap/src/asb/quote_websocket.rs
Normal file
@ -0,0 +1,88 @@
|
|||||||
|
use crate::asb::Rate;
|
||||||
|
use crate::kraken::PriceUpdates;
|
||||||
|
use crate::network::quote::BidQuote;
|
||||||
|
use anyhow::Context;
|
||||||
|
use futures::{stream, StreamExt, TryStreamExt};
|
||||||
|
use rust_decimal::Decimal;
|
||||||
|
use serde::Serialize;
|
||||||
|
use warp::ws::{Message, WebSocket};
|
||||||
|
use warp::Filter;
|
||||||
|
|
||||||
|
pub async fn setup_quote_websocket(
|
||||||
|
price_updates: PriceUpdates,
|
||||||
|
spread: Decimal,
|
||||||
|
min: bitcoin::Amount,
|
||||||
|
max: bitcoin::Amount,
|
||||||
|
) {
|
||||||
|
let latest_quote = warp::get()
|
||||||
|
.and(warp::path!("api" / "quote" / "xmr-btc"))
|
||||||
|
.and(warp::ws())
|
||||||
|
.map(move |ws: warp::ws::Ws| {
|
||||||
|
let price_updates = price_updates.clone();
|
||||||
|
tracing::info!("New quote websocket connection");
|
||||||
|
ws.on_upgrade(move |socket| {
|
||||||
|
quote_stream(socket, price_updates, spread, min, max)
|
||||||
|
})
|
||||||
|
});
|
||||||
|
tokio::spawn(async move {
|
||||||
|
warp::serve(latest_quote).run(([0, 0, 0, 0], 3030)).await;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn quote_stream(
|
||||||
|
ws: WebSocket,
|
||||||
|
subscription: PriceUpdates,
|
||||||
|
spread: Decimal,
|
||||||
|
min: bitcoin::Amount,
|
||||||
|
max: bitcoin::Amount,
|
||||||
|
) {
|
||||||
|
let stream = stream::try_unfold(subscription.inner, move |mut receiver| async move {
|
||||||
|
if let Err(e) = receiver.changed().await {
|
||||||
|
tracing::error!(
|
||||||
|
"Failed to initialize price update stream for quote websocket: {:#}",
|
||||||
|
e
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
let quote = match receiver.borrow().clone() {
|
||||||
|
Ok(latest_price_update) => match Rate::new(latest_price_update.ask, spread).ask() {
|
||||||
|
Ok(amount) => WebsocketBidQuote::Quote(BidQuote {
|
||||||
|
price: amount,
|
||||||
|
min_quantity: min,
|
||||||
|
max_quantity: max,
|
||||||
|
}),
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("Failed to create quote for quote websocket: {:#}", e);
|
||||||
|
WebsocketBidQuote::Error(Error::BidQuoteError)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!(
|
||||||
|
"Failed to fetch latest price update for quote websocket: {:#}",
|
||||||
|
e
|
||||||
|
);
|
||||||
|
WebsocketBidQuote::Error(Error::PriceUpdateError)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let msg = Message::text(serde_json::to_string("e).expect("quote to serialize"));
|
||||||
|
|
||||||
|
Ok(Some((msg, receiver)))
|
||||||
|
})
|
||||||
|
.into_stream();
|
||||||
|
|
||||||
|
let (ws_tx, mut _ws_rx) = ws.split();
|
||||||
|
tokio::task::spawn(stream.forward(ws_tx));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Debug, Clone)]
|
||||||
|
enum WebsocketBidQuote {
|
||||||
|
Quote(BidQuote),
|
||||||
|
Error(Error),
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Serialize)]
|
||||||
|
enum Error {
|
||||||
|
PriceUpdateError,
|
||||||
|
BidQuoteError,
|
||||||
|
}
|
@ -26,9 +26,8 @@ use swap::asb::command::{parse_args, Arguments, Command};
|
|||||||
use swap::asb::config::{
|
use swap::asb::config::{
|
||||||
initial_setup, query_user_for_initial_config, read_config, Config, ConfigNotInitialized,
|
initial_setup, query_user_for_initial_config, read_config, Config, ConfigNotInitialized,
|
||||||
};
|
};
|
||||||
use swap::asb::price_websocket::latest_rate;
|
use swap::asb::quote_websocket::setup_quote_websocket;
|
||||||
use swap::database::Database;
|
use swap::database::Database;
|
||||||
use swap::kraken::PriceUpdates;
|
|
||||||
use swap::monero::Amount;
|
use swap::monero::Amount;
|
||||||
use swap::network::swarm;
|
use swap::network::swarm;
|
||||||
use swap::protocol::alice;
|
use swap::protocol::alice;
|
||||||
@ -39,7 +38,6 @@ use swap::tor::AuthenticatedClient;
|
|||||||
use swap::{asb, bitcoin, kraken, monero, tor};
|
use swap::{asb, bitcoin, kraken, monero, tor};
|
||||||
use tracing::{debug, info, warn};
|
use tracing::{debug, info, warn};
|
||||||
use tracing_subscriber::filter::LevelFilter;
|
use tracing_subscriber::filter::LevelFilter;
|
||||||
use warp::Filter;
|
|
||||||
|
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate prettytable;
|
extern crate prettytable;
|
||||||
@ -130,7 +128,13 @@ async fn main() -> Result<()> {
|
|||||||
|
|
||||||
let kraken_price_updates = kraken::connect()?;
|
let kraken_price_updates = kraken::connect()?;
|
||||||
|
|
||||||
setup_price_websocket(kraken_price_updates.clone()).await;
|
setup_quote_websocket(
|
||||||
|
kraken_price_updates.clone(),
|
||||||
|
config.maker.ask_spread,
|
||||||
|
config.maker.min_buy_btc,
|
||||||
|
config.maker.max_buy_btc,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
// setup Tor hidden services
|
// setup Tor hidden services
|
||||||
let tor_client =
|
let tor_client =
|
||||||
@ -301,20 +305,6 @@ async fn main() -> Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn setup_price_websocket(price_updates: PriceUpdates) {
|
|
||||||
let latest_rate = warp::get()
|
|
||||||
.and(warp::path!("api" / "price" / "xmr-btc"))
|
|
||||||
.and(warp::ws())
|
|
||||||
.map(move |ws: warp::ws::Ws| {
|
|
||||||
let price_updates = price_updates.clone();
|
|
||||||
info!("New price websocket connection");
|
|
||||||
ws.on_upgrade(move |socket| latest_rate(socket, price_updates))
|
|
||||||
});
|
|
||||||
tokio::spawn(async move {
|
|
||||||
warp::serve(latest_rate).run(([0, 0, 0, 0], 3030)).await;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn init_bitcoin_wallet(
|
async fn init_bitcoin_wallet(
|
||||||
config: &Config,
|
config: &Config,
|
||||||
seed: &Seed,
|
seed: &Seed,
|
||||||
|
Loading…
Reference in New Issue
Block a user