[wip] rate update via sse

This commit is contained in:
Philipp Hoenisch 2021-06-03 14:46:56 +10:00
parent d5d0dda6e7
commit 20ea217e7b
No known key found for this signature in database
GPG Key ID: E5F8E74C672BC666
5 changed files with 232 additions and 7 deletions

150
Cargo.lock generated
View File

@ -474,6 +474,16 @@ dependencies = [
"serde",
]
[[package]]
name = "buf_redux"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b953a6887648bb07a535631f2bc00fbdb2a2216f135552cb3f534ed136b9c07f"
dependencies = [
"memchr",
"safemem",
]
[[package]]
name = "bumpalo"
version = "3.6.1"
@ -1338,6 +1348,31 @@ dependencies = [
"ahash",
]
[[package]]
name = "headers"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0b7591fb62902706ae8e7aaff416b1b0fa2c0fd0878b46dc13baa3712d8a855"
dependencies = [
"base64 0.13.0",
"bitflags",
"bytes 1.0.1",
"headers-core",
"http",
"mime 0.3.16",
"sha-1",
"time 0.1.43",
]
[[package]]
name = "headers-core"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429"
dependencies = [
"http",
]
[[package]]
name = "heck"
version = "0.3.2"
@ -1474,7 +1509,7 @@ dependencies = [
"time 0.1.43",
"traitobject",
"typeable",
"unicase",
"unicase 1.4.2",
"url 1.7.2",
]
@ -2152,6 +2187,16 @@ version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"
[[package]]
name = "mime_guess"
version = "2.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2684d4c2e97d99848d30b324b00c8fcc7e5c897b7cbb5819b09e7c90e8baf212"
dependencies = [
"mime 0.3.16",
"unicase 2.6.0",
]
[[package]]
name = "minicbor"
version = "0.8.0"
@ -2322,6 +2367,24 @@ version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
[[package]]
name = "multipart"
version = "0.17.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d050aeedc89243f5347c3e237e3e13dc76fbe4ae3742a57b94dc14f69acf76d4"
dependencies = [
"buf_redux",
"httparse",
"log 0.4.14",
"mime 0.3.16",
"mime_guess",
"quick-error 1.2.3",
"rand 0.7.3",
"safemem",
"tempfile",
"twoway",
]
[[package]]
name = "multistream-select"
version = "0.10.2"
@ -3934,7 +3997,7 @@ dependencies = [
"tokio",
"tokio-socks",
"tokio-tar",
"tokio-tungstenite",
"tokio-tungstenite 0.14.0",
"tokio-util",
"toml",
"torut",
@ -3945,6 +4008,7 @@ dependencies = [
"url 2.2.2",
"uuid",
"void",
"warp",
"zip",
]
@ -4238,6 +4302,19 @@ dependencies = [
"xattr",
]
[[package]]
name = "tokio-tungstenite"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1a5f475f1b9d077ea1017ecbc60890fda8e54942d680ca0b1d2b47cfa2d861b"
dependencies = [
"futures-util",
"log 0.4.14",
"pin-project 1.0.5",
"tokio",
"tungstenite 0.12.0",
]
[[package]]
name = "tokio-tungstenite"
version = "0.14.0"
@ -4250,7 +4327,7 @@ dependencies = [
"rustls 0.19.0",
"tokio",
"tokio-rustls",
"tungstenite",
"tungstenite 0.13.0",
"webpki",
"webpki-roots 0.21.0",
]
@ -4309,6 +4386,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09adeb8c97449311ccd28a427f96fb563e7fd31aabf994189879d9da2394b89d"
dependencies = [
"cfg-if 1.0.0",
"log 0.4.14",
"pin-project-lite 0.2.6",
"tracing-attributes",
"tracing-core",
@ -4456,6 +4534,25 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
[[package]]
name = "tungstenite"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ada8297e8d70872fa9a551d93250a9f407beb9f37ef86494eb20012a2ff7c24"
dependencies = [
"base64 0.13.0",
"byteorder",
"bytes 1.0.1",
"http",
"httparse",
"input_buffer",
"log 0.4.14",
"rand 0.8.3",
"sha-1",
"url 2.2.2",
"utf-8",
]
[[package]]
name = "tungstenite"
version = "0.13.0"
@ -4479,6 +4576,15 @@ dependencies = [
"webpki-roots 0.21.0",
]
[[package]]
name = "twoway"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59b11b2b5241ba34be09c3cc85a36e56e48f9888862e19cedf23336d35316ed1"
dependencies = [
"memchr",
]
[[package]]
name = "typeable"
version = "0.1.2"
@ -4518,6 +4624,15 @@ dependencies = [
"version_check 0.1.5",
]
[[package]]
name = "unicase"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6"
dependencies = [
"version_check 0.9.3",
]
[[package]]
name = "unicode-bidi"
version = "0.3.4"
@ -4683,6 +4798,35 @@ dependencies = [
"try-lock",
]
[[package]]
name = "warp"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "332d47745e9a0c38636dbd454729b147d16bd1ed08ae67b3ab281c4506771054"
dependencies = [
"bytes 1.0.1",
"futures",
"headers",
"http",
"hyper 0.14.8",
"log 0.4.14",
"mime 0.3.16",
"mime_guess",
"multipart",
"percent-encoding 2.1.0",
"pin-project 1.0.5",
"scoped-tls",
"serde",
"serde_json",
"serde_urlencoded",
"tokio",
"tokio-stream",
"tokio-tungstenite 0.13.0",
"tokio-util",
"tower-service",
"tracing",
]
[[package]]
name = "wasi"
version = "0.9.0+wasi-snapshot-preview1"

View File

@ -65,6 +65,7 @@ tracing-subscriber = { version = "0.2", default-features = false, features = [ "
url = { version = "2", features = [ "serde" ] }
uuid = { version = "0.8", features = [ "serde", "v4" ] }
void = "1"
warp = "0.3"
[target.'cfg(not(windows))'.dependencies]
tokio-tar = "0.3"

View File

@ -13,13 +13,15 @@
#![allow(non_snake_case)]
use anyhow::{bail, Context, Result};
use futures::{StreamExt, TryStreamExt};
use libp2p::core::multiaddr::Protocol;
use libp2p::core::Multiaddr;
use libp2p::Swarm;
use prettytable::{row, Table};
use std::env;
use std::error::Error;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::Arc;
use std::{env, fmt};
use structopt::clap;
use structopt::clap::ErrorKind;
use swap::asb::command::{parse_args, Arguments, Command};
@ -27,6 +29,7 @@ use swap::asb::config::{
initial_setup, query_user_for_initial_config, read_config, Config, ConfigNotInitialized,
};
use swap::database::Database;
use swap::kraken::PriceUpdates;
use swap::monero::Amount;
use swap::network::swarm;
use swap::protocol::alice;
@ -37,6 +40,7 @@ use swap::tor::AuthenticatedClient;
use swap::{asb, bitcoin, kraken, monero, tor};
use tracing::{debug, info, warn};
use tracing_subscriber::filter::LevelFilter;
use warp::{Filter, Reply};
#[macro_use]
extern crate prettytable;
@ -127,6 +131,14 @@ async fn main() -> Result<()> {
let kraken_price_updates = kraken::connect()?;
let updates = kraken_price_updates.clone();
let latest_rate = warp::get()
.and(warp::path!("api" / "rate" / "btc-xmr"))
.map(move || latest_rate(updates.clone()));
tokio::spawn(async move {
warp::serve(latest_rate).run(([127, 0, 0, 1], 3030)).await;
});
// setup Tor hidden services
let tor_client =
tor::Client::new(config.tor.socks5_port).with_control_port(config.tor.control_port);
@ -376,3 +388,45 @@ async fn register_tor_services(
Ok(ac)
}
fn latest_rate(subscription: PriceUpdates) -> impl Reply {
let stream = subscription
.into_stream()
.map_ok(|data| {
let event = warp::sse::Event::default()
.id("rate")
.json_data(data)
.context("failed to attach json data to sse event")?;
Ok(event)
})
.map(|result| match result {
Ok(Ok(ok)) => Ok(ok),
Ok(Err(e)) => Err(e),
Err(e) => Err(e),
})
.err_into::<RateStreamError>();
warp::sse::reply(warp::sse::keep_alive().stream(stream))
}
#[derive(Debug)]
struct RateStreamError(anyhow::Error);
impl fmt::Display for RateStreamError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:#}", self.0)
}
}
impl Error for RateStreamError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
self.0.source()
}
}
impl From<anyhow::Error> for RateStreamError {
fn from(e: anyhow::Error) -> Self {
RateStreamError(e)
}
}

View File

@ -1,6 +1,7 @@
use anyhow::{anyhow, Context, Result};
use futures::{SinkExt, StreamExt, TryStreamExt};
use futures::{stream, SinkExt, Stream, StreamExt, TryStreamExt};
use serde::Deserialize;
use serde::Serialize;
use std::convert::{Infallible, TryFrom};
use std::sync::Arc;
use std::time::Duration;
@ -83,9 +84,31 @@ impl PriceUpdates {
pub fn latest_update(&mut self) -> PriceUpdate {
self.inner.borrow().clone()
}
pub fn into_stream(self) -> impl Stream<Item = Result<PriceUpdate>> {
stream::try_unfold(self.inner, |mut receiver| async move {
receiver
.changed()
.await
.context("failed to receive latest rate update")?;
let latest_rate = receiver
.borrow()
.clone()
.map_err(|_e| Error::NotYetAvailable);
Ok(Some((latest_rate, receiver)))
})
}
}
#[derive(Clone, Debug, thiserror::Error)]
impl From<watch::Receiver<PriceUpdate>> for PriceUpdates {
fn from(inner: watch::Receiver<PriceUpdate>) -> Self {
Self { inner }
}
}
#[derive(Clone, Debug, thiserror::Error, Serialize)]
pub enum Error {
#[error("Rate is not yet available")]
NotYetAvailable,
@ -248,9 +271,10 @@ mod wire {
}
/// Represents an update within the price ticker.
#[derive(Clone, Debug, Deserialize)]
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(try_from = "TickerUpdate")]
pub struct PriceUpdate {
#[serde(with = "::bitcoin::util::amount::serde::as_sat")]
pub ask: bitcoin::Amount,
}

View File

@ -122,6 +122,8 @@ impl EventLoop {
}
}
SwarmEvent::Behaviour(OutEvent::TransferProofReceived { msg, channel, peer }) => {
tracing::info!(%msg.swap_id, %peer, "Received transfer proof msg for");
let swap_id = msg.swap_id;
if peer != self.alice_peer_id {