diff --git a/Cargo.lock b/Cargo.lock index a92f090a..548ae6c0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -474,6 +474,16 @@ dependencies = [ "serde", ] +[[package]] +name = "buf_redux" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b953a6887648bb07a535631f2bc00fbdb2a2216f135552cb3f534ed136b9c07f" +dependencies = [ + "memchr", + "safemem", +] + [[package]] name = "bumpalo" version = "3.6.1" @@ -1338,6 +1348,31 @@ dependencies = [ "ahash", ] +[[package]] +name = "headers" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0b7591fb62902706ae8e7aaff416b1b0fa2c0fd0878b46dc13baa3712d8a855" +dependencies = [ + "base64 0.13.0", + "bitflags", + "bytes 1.0.1", + "headers-core", + "http", + "mime 0.3.16", + "sha-1", + "time 0.1.43", +] + +[[package]] +name = "headers-core" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" +dependencies = [ + "http", +] + [[package]] name = "heck" version = "0.3.2" @@ -1474,7 +1509,7 @@ dependencies = [ "time 0.1.43", "traitobject", "typeable", - "unicase", + "unicase 1.4.2", "url 1.7.2", ] @@ -2152,6 +2187,16 @@ version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" +[[package]] +name = "mime_guess" +version = "2.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2684d4c2e97d99848d30b324b00c8fcc7e5c897b7cbb5819b09e7c90e8baf212" +dependencies = [ + "mime 0.3.16", + "unicase 2.6.0", +] + [[package]] name = "minicbor" version = "0.8.0" @@ -2322,6 +2367,24 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" +[[package]] +name = "multipart" +version = "0.17.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d050aeedc89243f5347c3e237e3e13dc76fbe4ae3742a57b94dc14f69acf76d4" +dependencies = [ + "buf_redux", + "httparse", + "log 0.4.14", + "mime 0.3.16", + "mime_guess", + "quick-error 1.2.3", + "rand 0.7.3", + "safemem", + "tempfile", + "twoway", +] + [[package]] name = "multistream-select" version = "0.10.2" @@ -3934,7 +3997,7 @@ dependencies = [ "tokio", "tokio-socks", "tokio-tar", - "tokio-tungstenite", + "tokio-tungstenite 0.14.0", "tokio-util", "toml", "torut", @@ -3945,6 +4008,7 @@ dependencies = [ "url 2.2.2", "uuid", "void", + "warp", "zip", ] @@ -4238,6 +4302,19 @@ dependencies = [ "xattr", ] +[[package]] +name = "tokio-tungstenite" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1a5f475f1b9d077ea1017ecbc60890fda8e54942d680ca0b1d2b47cfa2d861b" +dependencies = [ + "futures-util", + "log 0.4.14", + "pin-project 1.0.5", + "tokio", + "tungstenite 0.12.0", +] + [[package]] name = "tokio-tungstenite" version = "0.14.0" @@ -4250,7 +4327,7 @@ dependencies = [ "rustls 0.19.0", "tokio", "tokio-rustls", - "tungstenite", + "tungstenite 0.13.0", "webpki", "webpki-roots 0.21.0", ] @@ -4309,6 +4386,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09adeb8c97449311ccd28a427f96fb563e7fd31aabf994189879d9da2394b89d" dependencies = [ "cfg-if 1.0.0", + "log 0.4.14", "pin-project-lite 0.2.6", "tracing-attributes", "tracing-core", @@ -4456,6 +4534,25 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" +[[package]] +name = "tungstenite" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ada8297e8d70872fa9a551d93250a9f407beb9f37ef86494eb20012a2ff7c24" +dependencies = [ + "base64 0.13.0", + "byteorder", + "bytes 1.0.1", + "http", + "httparse", + "input_buffer", + "log 0.4.14", + "rand 0.8.3", + "sha-1", + "url 2.2.2", + "utf-8", +] + [[package]] name = "tungstenite" version = "0.13.0" @@ -4479,6 +4576,15 @@ dependencies = [ "webpki-roots 0.21.0", ] +[[package]] +name = "twoway" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59b11b2b5241ba34be09c3cc85a36e56e48f9888862e19cedf23336d35316ed1" +dependencies = [ + "memchr", +] + [[package]] name = "typeable" version = "0.1.2" @@ -4518,6 +4624,15 @@ dependencies = [ "version_check 0.1.5", ] +[[package]] +name = "unicase" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6" +dependencies = [ + "version_check 0.9.3", +] + [[package]] name = "unicode-bidi" version = "0.3.4" @@ -4683,6 +4798,35 @@ dependencies = [ "try-lock", ] +[[package]] +name = "warp" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "332d47745e9a0c38636dbd454729b147d16bd1ed08ae67b3ab281c4506771054" +dependencies = [ + "bytes 1.0.1", + "futures", + "headers", + "http", + "hyper 0.14.8", + "log 0.4.14", + "mime 0.3.16", + "mime_guess", + "multipart", + "percent-encoding 2.1.0", + "pin-project 1.0.5", + "scoped-tls", + "serde", + "serde_json", + "serde_urlencoded", + "tokio", + "tokio-stream", + "tokio-tungstenite 0.13.0", + "tokio-util", + "tower-service", + "tracing", +] + [[package]] name = "wasi" version = "0.9.0+wasi-snapshot-preview1" diff --git a/swap/Cargo.toml b/swap/Cargo.toml index ba8ed7b3..f5575eb7 100644 --- a/swap/Cargo.toml +++ b/swap/Cargo.toml @@ -65,6 +65,7 @@ tracing-subscriber = { version = "0.2", default-features = false, features = [ " url = { version = "2", features = [ "serde" ] } uuid = { version = "0.8", features = [ "serde", "v4" ] } void = "1" +warp = "0.3" [target.'cfg(not(windows))'.dependencies] tokio-tar = "0.3" diff --git a/swap/src/bin/asb.rs b/swap/src/bin/asb.rs index eea3bcce..6534ba3e 100644 --- a/swap/src/bin/asb.rs +++ b/swap/src/bin/asb.rs @@ -13,13 +13,15 @@ #![allow(non_snake_case)] use anyhow::{bail, Context, Result}; +use futures::{StreamExt, TryStreamExt}; use libp2p::core::multiaddr::Protocol; use libp2p::core::Multiaddr; use libp2p::Swarm; use prettytable::{row, Table}; -use std::env; +use std::error::Error; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::Arc; +use std::{env, fmt}; use structopt::clap; use structopt::clap::ErrorKind; use swap::asb::command::{parse_args, Arguments, Command}; @@ -27,6 +29,7 @@ use swap::asb::config::{ initial_setup, query_user_for_initial_config, read_config, Config, ConfigNotInitialized, }; use swap::database::Database; +use swap::kraken::PriceUpdates; use swap::monero::Amount; use swap::network::swarm; use swap::protocol::alice; @@ -37,6 +40,7 @@ use swap::tor::AuthenticatedClient; use swap::{asb, bitcoin, kraken, monero, tor}; use tracing::{debug, info, warn}; use tracing_subscriber::filter::LevelFilter; +use warp::{Filter, Reply}; #[macro_use] extern crate prettytable; @@ -127,6 +131,14 @@ async fn main() -> Result<()> { let kraken_price_updates = kraken::connect()?; + let updates = kraken_price_updates.clone(); + let latest_rate = warp::get() + .and(warp::path!("api" / "rate" / "btc-xmr")) + .map(move || latest_rate(updates.clone())); + tokio::spawn(async move { + warp::serve(latest_rate).run(([127, 0, 0, 1], 3030)).await; + }); + // setup Tor hidden services let tor_client = tor::Client::new(config.tor.socks5_port).with_control_port(config.tor.control_port); @@ -376,3 +388,45 @@ async fn register_tor_services( Ok(ac) } + +fn latest_rate(subscription: PriceUpdates) -> impl Reply { + let stream = subscription + .into_stream() + .map_ok(|data| { + let event = warp::sse::Event::default() + .id("rate") + .json_data(data) + .context("failed to attach json data to sse event")?; + + Ok(event) + }) + .map(|result| match result { + Ok(Ok(ok)) => Ok(ok), + Ok(Err(e)) => Err(e), + Err(e) => Err(e), + }) + .err_into::(); + + warp::sse::reply(warp::sse::keep_alive().stream(stream)) +} + +#[derive(Debug)] +struct RateStreamError(anyhow::Error); + +impl fmt::Display for RateStreamError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{:#}", self.0) + } +} + +impl Error for RateStreamError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + self.0.source() + } +} + +impl From for RateStreamError { + fn from(e: anyhow::Error) -> Self { + RateStreamError(e) + } +} diff --git a/swap/src/kraken.rs b/swap/src/kraken.rs index 001696ff..9ab0ae3f 100644 --- a/swap/src/kraken.rs +++ b/swap/src/kraken.rs @@ -1,6 +1,7 @@ use anyhow::{anyhow, Context, Result}; -use futures::{SinkExt, StreamExt, TryStreamExt}; +use futures::{stream, SinkExt, Stream, StreamExt, TryStreamExt}; use serde::Deserialize; +use serde::Serialize; use std::convert::{Infallible, TryFrom}; use std::sync::Arc; use std::time::Duration; @@ -83,9 +84,31 @@ impl PriceUpdates { pub fn latest_update(&mut self) -> PriceUpdate { self.inner.borrow().clone() } + + pub fn into_stream(self) -> impl Stream> { + stream::try_unfold(self.inner, |mut receiver| async move { + receiver + .changed() + .await + .context("failed to receive latest rate update")?; + + let latest_rate = receiver + .borrow() + .clone() + .map_err(|_e| Error::NotYetAvailable); + + Ok(Some((latest_rate, receiver))) + }) + } } -#[derive(Clone, Debug, thiserror::Error)] +impl From> for PriceUpdates { + fn from(inner: watch::Receiver) -> Self { + Self { inner } + } +} + +#[derive(Clone, Debug, thiserror::Error, Serialize)] pub enum Error { #[error("Rate is not yet available")] NotYetAvailable, @@ -248,9 +271,10 @@ mod wire { } /// Represents an update within the price ticker. - #[derive(Clone, Debug, Deserialize)] + #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(try_from = "TickerUpdate")] pub struct PriceUpdate { + #[serde(with = "::bitcoin::util::amount::serde::as_sat")] pub ask: bitcoin::Amount, } diff --git a/swap/src/protocol/bob/event_loop.rs b/swap/src/protocol/bob/event_loop.rs index 4fe347bf..e396684c 100644 --- a/swap/src/protocol/bob/event_loop.rs +++ b/swap/src/protocol/bob/event_loop.rs @@ -122,6 +122,8 @@ impl EventLoop { } } SwarmEvent::Behaviour(OutEvent::TransferProofReceived { msg, channel, peer }) => { + tracing::info!(%msg.swap_id, %peer, "Received transfer proof msg for"); + let swap_id = msg.swap_id; if peer != self.alice_peer_id {