diff --git a/Cargo.lock b/Cargo.lock index 0f8cd3d2..7e6e53f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4559,9 +4559,11 @@ dependencies = [ "percent-encoding", "pin-project-lite", "socket2 0.6.0", + "system-configuration 0.6.1", "tokio", "tower-service", "tracing", + "windows-registry", ] [[package]] @@ -6219,25 +6221,31 @@ name = "monero-rpc-pool" version = "0.1.0" dependencies = [ "anyhow", + "arti-client", "axum", "chrono", "clap 4.5.41", "futures", + "http-body-util", + "hyper 1.6.0", + "hyper-util", "monero", "monero-rpc", + "native-tls", "rand 0.8.5", "regex", "serde", "serde_json", "sqlx", "tokio", + "tokio-native-tls", "tokio-test", + "tor-rtcompat", "tower 0.4.13", "tower-http 0.5.2", "tracing", "tracing-subscriber", "typeshare", - "ureq", "url", "uuid", ] @@ -12872,21 +12880,6 @@ version = "0.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6d49784317cd0d1ee7ec5c716dd598ec5b4483ea832a2dced265471cc0f690ae" -[[package]] -name = "ureq" -version = "2.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02d1a66277ed75f640d608235660df48c8e3c19f3b4edb6a263315626cc3c01d" -dependencies = [ - "base64 0.22.1", - "log", - "once_cell", - "rustls 0.23.29", - "rustls-pki-types", - "url", - "webpki-roots 0.26.11", -] - [[package]] name = "url" version = "2.5.4" @@ -13594,6 +13587,17 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows-registry" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b8a9ed28765efc97bbc954883f4e6796c33a06546ebafacbabee9696967499e" +dependencies = [ + "windows-link", + "windows-result 0.3.4", + "windows-strings", +] + [[package]] name = "windows-result" version = "0.1.2" diff --git a/monero-rpc-pool/Cargo.toml b/monero-rpc-pool/Cargo.toml index 04554f86..9900844b 100644 --- a/monero-rpc-pool/Cargo.toml +++ b/monero-rpc-pool/Cargo.toml @@ -27,9 +27,17 @@ tower-http = { version = "0.5", features = ["cors"] } tracing = { workspace = true } tracing-subscriber = { workspace = true } typeshare = { workspace = true } -ureq = { version = "2.10", default-features = false, features = ["tls"] } url = "2.0" uuid = { workspace = true } +arti-client = { workspace = true, features = ["tokio"] } +tor-rtcompat = { workspace = true, features = ["tokio", "rustls"] } + +http-body-util = "0.1" +hyper = { version = "1", features = ["full"] } +hyper-util = { version = "0.1", features = ["full"] } +native-tls = "0.2" +tokio-native-tls = "0.3" + [dev-dependencies] tokio-test = "0.4" diff --git a/monero-rpc-pool/src/config.rs b/monero-rpc-pool/src/config.rs index fda354ff..e1375162 100644 --- a/monero-rpc-pool/src/config.rs +++ b/monero-rpc-pool/src/config.rs @@ -1,27 +1,53 @@ -use serde::{Deserialize, Serialize}; use std::path::PathBuf; -#[derive(Debug, Clone, Serialize, Deserialize)] +use crate::TorClientArc; + +#[derive(Clone)] pub struct Config { pub host: String, pub port: u16, pub data_dir: PathBuf, + pub tor_client: Option, +} + +impl std::fmt::Debug for Config { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Config") + .field("host", &self.host) + .field("port", &self.port) + .field("data_dir", &self.data_dir) + .field("tor_client", &self.tor_client.is_some()) + .finish() + } } impl Config { pub fn new_with_port(host: String, port: u16, data_dir: PathBuf) -> Self { + Self::new_with_port_and_tor_client(host, port, data_dir, None) + } + + pub fn new_with_port_and_tor_client( + host: String, + port: u16, + data_dir: PathBuf, + tor_client: impl Into>, + ) -> Self { Self { host, port, data_dir, + tor_client: tor_client.into(), } } - pub fn new_random_port(host: String, data_dir: PathBuf) -> Self { - Self { - host, - port: 0, - data_dir, - } + pub fn new_random_port(data_dir: PathBuf) -> Self { + Self::new_random_port_with_tor_client(data_dir, None) + } + + pub fn new_random_port_with_tor_client( + data_dir: PathBuf, + tor_client: impl Into>, + ) -> Self { + Self::new_with_port_and_tor_client("127.0.0.1".to_string(), 0, data_dir, tor_client) } } diff --git a/monero-rpc-pool/src/lib.rs b/monero-rpc-pool/src/lib.rs index 2b123df2..82e15649 100644 --- a/monero-rpc-pool/src/lib.rs +++ b/monero-rpc-pool/src/lib.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use anyhow::Result; +use arti_client::TorClient; use axum::{ routing::{any, get}, Router, @@ -8,9 +9,13 @@ use axum::{ use monero::Network; use tokio::task::JoinHandle; +use tor_rtcompat::tokio::TokioRustlsRuntime; use tower_http::cors::CorsLayer; use tracing::{error, info}; +/// Type alias for the Tor client used throughout the crate +pub type TorClientArc = Arc>; + pub trait ToNetworkString { fn to_network_string(&self) -> String; } @@ -39,7 +44,7 @@ use proxy::{proxy_handler, stats_handler}; #[derive(Clone)] pub struct AppState { pub node_pool: Arc, - pub http_client: ureq::Agent, + pub tor_client: Option, } /// Manages background tasks for the RPC pool @@ -104,17 +109,9 @@ async fn create_app_with_receiver( status_update_handle, }; - // Create shared HTTP client with connection pooling and keep-alive - // TODO: Add dangerous certificate acceptance equivalent to reqwest's danger_accept_invalid_certs(true) - let http_client = ureq::AgentBuilder::new() - .timeout(std::time::Duration::from_secs(30)) - .max_idle_connections(100) - .max_idle_connections_per_host(10) - .build(); - let app_state = AppState { node_pool, - http_client, + tor_client: config.tor_client, }; // Build the app @@ -177,14 +174,8 @@ pub async fn start_server_with_random_port( tokio::sync::broadcast::Receiver, PoolHandle, )> { - // Clone the host before moving config let host = config.host.clone(); - - // If port is 0, the system will assign a random available port - let config_with_random_port = Config::new_random_port(config.host, config.data_dir); - - let (app, status_receiver, pool_handle) = - create_app_with_receiver(config_with_random_port, network).await?; + let (app, status_receiver, pool_handle) = create_app_with_receiver(config, network).await?; // Bind to port 0 to get a random available port let listener = tokio::net::TcpListener::bind(format!("{}:0", host)).await?; @@ -209,18 +200,3 @@ pub async fn start_server_with_random_port( Ok((server_info, status_receiver, pool_handle)) } - -/// Start a server with a random port and custom data directory for library usage -/// Returns the server info with the actual port used, a receiver for pool status updates, and pool handle -pub async fn start_server_with_random_port_and_data_dir( - config: Config, - network: Network, - data_dir: std::path::PathBuf, -) -> Result<( - ServerInfo, - tokio::sync::broadcast::Receiver, - PoolHandle, -)> { - let config_with_data_dir = Config::new_random_port(config.host, data_dir); - start_server_with_random_port(config_with_data_dir, network).await -} diff --git a/monero-rpc-pool/src/main.rs b/monero-rpc-pool/src/main.rs index 2b03fc08..a2e2db68 100644 --- a/monero-rpc-pool/src/main.rs +++ b/monero-rpc-pool/src/main.rs @@ -1,3 +1,4 @@ +use arti_client::{TorClient, TorClientConfig}; use clap::Parser; use monero_rpc_pool::{config::Config, run_server}; use tracing::info; @@ -47,6 +48,11 @@ struct Args { #[arg(short, long)] #[arg(help = "Enable verbose logging")] verbose: bool, + + #[arg(short, long)] + #[arg(help = "Enable Tor routing")] + #[arg(default_value = "true")] + tor: bool, } #[tokio::main] @@ -54,16 +60,46 @@ async fn main() -> Result<(), Box> { let args = Args::parse(); tracing_subscriber::fmt() - .with_env_filter(EnvFilter::new("trace")) + .with_env_filter(EnvFilter::new("info")) .with_target(false) .with_file(true) .with_line_number(true) .init(); - let config = Config::new_with_port( + let tor_client = if args.tor { + let config = TorClientConfig::default(); + let runtime = tor_rtcompat::tokio::TokioRustlsRuntime::current() + .expect("We are always running with tokio"); + + let client = TorClient::with_runtime(runtime) + .config(config) + .create_unbootstrapped_async() + .await?; + + let client = std::sync::Arc::new(client); + + let client_clone = client.clone(); + tokio::spawn(async move { + match client_clone.bootstrap().await { + Ok(()) => { + info!("Tor client successfully bootstrapped"); + } + Err(e) => { + tracing::error!("Failed to bootstrap Tor client: {}. Tor functionality will be unavailable.", e); + } + } + }); + + Some(client) + } else { + None + }; + + let config = Config::new_with_port_and_tor_client( args.host, args.port, std::env::temp_dir().join("monero-rpc-pool"), + tor_client, ); info!( diff --git a/monero-rpc-pool/src/pool.rs b/monero-rpc-pool/src/pool.rs index 0dd17f10..c033ab38 100644 --- a/monero-rpc-pool/src/pool.rs +++ b/monero-rpc-pool/src/pool.rs @@ -1,6 +1,9 @@ use anyhow::{Context, Result}; +use std::collections::VecDeque; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; use tokio::sync::broadcast; -use tracing::{debug, warn}; +use tracing::warn; use typeshare::typeshare; use crate::database::Database; @@ -16,6 +19,7 @@ pub struct PoolStatus { #[typeshare(serialized_as = "number")] pub unsuccessful_health_checks: u64, pub top_reliable_nodes: Vec, + pub bandwidth_kb_per_sec: f64, } #[derive(Debug, Clone, serde::Serialize)] @@ -26,10 +30,69 @@ pub struct ReliableNodeInfo { pub avg_latency_ms: Option, } +#[derive(Debug)] +struct BandwidthEntry { + timestamp: Instant, + bytes: u64, +} + +#[derive(Debug)] +struct BandwidthTracker { + entries: VecDeque, + window_duration: Duration, +} + +impl BandwidthTracker { + const WINDOW_DURATION: Duration = Duration::from_secs(60 * 3); + + fn new() -> Self { + Self { + entries: VecDeque::new(), + window_duration: Self::WINDOW_DURATION, + } + } + + fn record_bytes(&mut self, bytes: u64) { + let now = Instant::now(); + self.entries.push_back(BandwidthEntry { + timestamp: now, + bytes, + }); + + // Clean up old entries + let cutoff = now - self.window_duration; + while let Some(front) = self.entries.front() { + if front.timestamp < cutoff { + self.entries.pop_front(); + } else { + break; + } + } + } + + fn get_kb_per_sec(&self) -> f64 { + if self.entries.len() < 5 { + return 0.0; + } + + let total_bytes: u64 = self.entries.iter().map(|e| e.bytes).sum(); + let now = Instant::now(); + let oldest_time = self.entries.front().unwrap().timestamp; + let duration_secs = (now - oldest_time).as_secs_f64(); + + if duration_secs > 0.0 { + (total_bytes as f64 / 1024.0) / duration_secs + } else { + 0.0 + } + } +} + pub struct NodePool { db: Database, network: String, status_sender: broadcast::Sender, + bandwidth_tracker: Arc>, } impl NodePool { @@ -39,6 +102,7 @@ impl NodePool { db, network, status_sender, + bandwidth_tracker: Arc::new(Mutex::new(BandwidthTracker::new())), }; (pool, status_receiver) } @@ -63,13 +127,19 @@ impl NodePool { Ok(()) } + pub fn record_bandwidth(&self, bytes: u64) { + if let Ok(mut tracker) = self.bandwidth_tracker.lock() { + tracker.record_bytes(bytes); + } + } + pub async fn publish_status_update(&self) -> Result<()> { let status = self.get_current_status().await?; if let Err(e) = self.status_sender.send(status.clone()) { warn!("Failed to send status update: {}", e); } else { - debug!(?status, "Sent status update"); + tracing::debug!(?status, "Sent status update"); } Ok(()) @@ -81,6 +151,12 @@ impl NodePool { let (successful_checks, unsuccessful_checks) = self.db.get_health_check_stats(&self.network).await?; + let bandwidth_kb_per_sec = if let Ok(tracker) = self.bandwidth_tracker.lock() { + tracker.get_kb_per_sec() + } else { + 0.0 + }; + let top_reliable_nodes = reliable_nodes .into_iter() .take(5) @@ -97,6 +173,7 @@ impl NodePool { successful_health_checks: successful_checks, unsuccessful_health_checks: unsuccessful_checks, top_reliable_nodes, + bandwidth_kb_per_sec, }) } @@ -105,9 +182,10 @@ impl NodePool { pub async fn get_top_reliable_nodes(&self, limit: usize) -> Result> { use rand::seq::SliceRandom; - debug!( + tracing::debug!( "Getting top reliable nodes for network {} (target: {})", - self.network, limit + self.network, + limit ); let available_nodes = self @@ -149,7 +227,7 @@ impl NodePool { selected_nodes.push(node); } - debug!( + tracing::debug!( "Pool size: {} nodes for network {} (target: {})", selected_nodes.len(), self.network, @@ -158,53 +236,4 @@ impl NodePool { Ok(selected_nodes) } - - pub async fn get_pool_stats(&self) -> Result { - let (total, reachable, reliable) = self.db.get_node_stats(&self.network).await?; - let reliable_nodes = self.db.get_reliable_nodes(&self.network).await?; - - let avg_reliable_latency = if reliable_nodes.is_empty() { - None - } else { - let total_latency: f64 = reliable_nodes - .iter() - .filter_map(|node| node.health.avg_latency_ms) - .sum(); - let count = reliable_nodes - .iter() - .filter(|node| node.health.avg_latency_ms.is_some()) - .count(); - - if count > 0 { - Some(total_latency / count as f64) - } else { - None - } - }; - - Ok(PoolStats { - total_nodes: total, - reachable_nodes: reachable, - reliable_nodes: reliable, - avg_reliable_latency_ms: avg_reliable_latency, - }) - } -} - -#[derive(Debug)] -pub struct PoolStats { - pub total_nodes: i64, - pub reachable_nodes: i64, - pub reliable_nodes: i64, - pub avg_reliable_latency_ms: Option, // TOOD: Why is this an Option, we hate Options -} - -impl PoolStats { - pub fn health_percentage(&self) -> f64 { - if self.total_nodes == 0 { - 0.0 - } else { - (self.reachable_nodes as f64 / self.total_nodes as f64) * 100.0 - } - } } diff --git a/monero-rpc-pool/src/proxy.rs b/monero-rpc-pool/src/proxy.rs index 28df2d79..efc75121 100644 --- a/monero-rpc-pool/src/proxy.rs +++ b/monero-rpc-pool/src/proxy.rs @@ -1,28 +1,471 @@ use axum::{ body::Body, - extract::State, - http::{HeaderMap, Method, StatusCode}, + extract::{Request, State}, + http::{request::Parts, response, StatusCode}, response::Response, }; -use serde_json::json; -use std::io::Read; -use std::time::Instant; -use tracing::{debug, error, info_span, Instrument}; -use uuid::Uuid; +use http_body_util::BodyExt; +use hyper_util::rt::TokioIo; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::net::TcpStream; +use tokio_native_tls::native_tls::TlsConnector; +use tracing::{error, info_span, Instrument}; use crate::AppState; -fn display_node(node: &(String, String, i64)) -> String { - format!("{}://{}:{}", node.0, node.1, node.2) +/// Trait alias for a stream that can be used with hyper +trait HyperStream: AsyncRead + AsyncWrite + Unpin + Send {} +impl HyperStream for T {} + +#[axum::debug_handler] +pub async fn proxy_handler(State(state): State, request: Request) -> Response { + static POOL_SIZE: usize = 10; + + // Get the pool of nodes + let available_pool = state + .node_pool + .get_top_reliable_nodes(POOL_SIZE) + .await + .map_err(|e| HandlerError::PoolError(e.to_string())) + .map(|nodes| { + let pool: Vec<(String, String, i64)> = nodes + .into_iter() + .map(|node| (node.scheme, node.host, node.port as i64)) + .collect(); + + pool + }); + + let (request, pool) = match available_pool { + Ok(pool) => { + match CloneableRequest::from_request(request).await { + Ok(cloneable_request) => (cloneable_request, pool), + Err(e) => { + return Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::from(e.to_string())) + .unwrap_or_else(|_| Response::new(Body::empty())); + } + } + } + Err(e) => { + // If we can't get a pool, return an error immediately + return Response::builder() + .status(StatusCode::SERVICE_UNAVAILABLE) + .body(Body::from(e.to_string())) + .unwrap_or_else(|_| Response::new(Body::empty())); + } + }; + + let uri = request.uri().to_string(); + let method = request.jsonrpc_method(); + match proxy_to_multiple_nodes(&state, request, pool) + .instrument(info_span!("request", uri = uri, method = method.as_deref())) + .await + { + Ok(response) => response, + Err(error) => error.to_response(), + } +} + +/// Given a Vec of nodes, proxy the given request to multiple nodes until we get a successful response +async fn proxy_to_multiple_nodes( + state: &AppState, + request: CloneableRequest, + nodes: Vec<(String, String, i64)>, +) -> Result { + if nodes.is_empty() { + return Err(HandlerError::NoNodes); + } + + let mut collected_errors: Vec<((String, String, i64), HandlerError)> = Vec::new(); + + fn push_error( + errors: &mut Vec<((String, String, i64), HandlerError)>, + node: (String, String, i64), + error: HandlerError, + ) { + errors.push((node, error)); + } + + // Go through the nodes one by one, and proxy the request to each node + // until we get a successful response or we run out of nodes + // Success is defined as either: + // - a raw HTTP response with a 200 response code + // - a JSON-RPC response with status code 200 and no error field + for node in nodes { + // Node attempt logging without creating spans to reduce overhead + let node_uri = display_node(&node); + + // Start timing the request + let latency = std::time::Instant::now(); + + let response = match proxy_to_single_node(request.clone(), &node, state.tor_client.clone()) + .instrument(info_span!( + "connection", + node = node_uri, + tor = state.tor_client.is_some(), + )) + .await + { + Ok(response) => response, + Err(e) => { + push_error(&mut collected_errors, node, HandlerError::PhyiscalError(e)); + continue; + } + }; + + // Calculate the latency + let latency = latency.elapsed().as_millis() as f64; + + // Convert response to cloneable to avoid consumption issues + let cloneable_response = CloneableResponse::from_response(response) + .await + .map_err(|e| { + HandlerError::CloneRequestError(format!("Failed to buffer response: {}", e)) + })?; + + let error = match cloneable_response.get_jsonrpc_error() { + Some(error) => { + // Check if we have already got two previous JSON-RPC errors + // If we did, we assume there is a reason for it + // We return the response as is. + if collected_errors + .iter() + .filter(|(_, error)| matches!(error, HandlerError::JsonRpcError(_))) + .count() + >= 2 + { + return Ok(cloneable_response.into_response()); + } + + Some(HandlerError::JsonRpcError(error)) + } + None if cloneable_response.status().is_client_error() + || cloneable_response.status().is_server_error() => + { + Some(HandlerError::HttpError(cloneable_response.status())) + } + _ => None, + }; + + match error { + Some(error) => { + tracing::info!("Proxy request to {} failed: {}", node_uri, error); + push_error(&mut collected_errors, node, error); + } + None => { + let response_size_bytes = cloneable_response.body.len() as u64; + tracing::info!( + "Proxy request to {} succeeded with size {}kb", + node_uri, + (response_size_bytes as f64 / 1024.0) + ); + + // Record bandwidth usage + state.node_pool.record_bandwidth(response_size_bytes); + + // Only record errors if we have gotten a successful response + // This helps prevent logging errors if its our likely our fault (no internet) + for (node, _) in collected_errors.iter() { + record_failure(&state, &node.0, &node.1, node.2).await; + } + + // Record the success with actual latency + record_success(&state, &node.0, &node.1, node.2, latency).await; + + // Finally return the successful response + return Ok(cloneable_response.into_response()); + } + } + } + + Err(HandlerError::AllRequestsFailed(collected_errors)) +} + +/// Wraps a stream with TLS if HTTPS is being used +async fn maybe_wrap_with_tls( + stream: impl AsyncRead + AsyncWrite + Unpin + Send + 'static, + scheme: &str, + host: &str, +) -> Result, SingleRequestError> { + if scheme == "https" { + let tls_connector = TlsConnector::builder().build().map_err(|e| { + SingleRequestError::ConnectionError(format!("TLS connector error: {}", e)) + })?; + let tls_connector = tokio_native_tls::TlsConnector::from(tls_connector); + + let tls_stream = tls_connector.connect(host, stream).await.map_err(|e| { + SingleRequestError::ConnectionError(format!("TLS connection error: {}", e)) + })?; + + Ok(Box::new(tls_stream)) + } else { + Ok(Box::new(stream)) + } +} + +/// Proxies a singular axum::Request to a single node. +/// Errors if we get a physical connection error +/// +/// Important: Does NOT error if the response is a HTTP error or a JSON-RPC error +/// The caller is responsible for checking the response status and body for errors +async fn proxy_to_single_node( + request: CloneableRequest, + node: &(String, String, i64), + tor_client: Option, +) -> Result { + if request.clearnet_whitelisted() { + tracing::info!("Request is whitelisted, sending over clearnet"); + } + + let response = match tor_client { + // If Tor client is ready for traffic, use it + Some(tor_client) + if tor_client.bootstrap_status().ready_for_traffic() + // If the request is whitelisted, we don't want to use Tor + && !request.clearnet_whitelisted() => + { + let stream = tor_client + .connect(format!("{}:{}", node.1, node.2)) + .await + .map_err(|e| SingleRequestError::ConnectionError(e.to_string()))?; + + // Wrap with TLS if using HTTPS + let stream = maybe_wrap_with_tls(stream, &node.0, &node.1).await?; + + let (mut sender, conn) = hyper::client::conn::http1::handshake(TokioIo::new(stream)) + .await + .map_err(|e| SingleRequestError::ConnectionError(e.to_string()))?; + + tracing::info!( + "Connected to node via Tor{}", + if node.0 == "https" { " with TLS" } else { "" } + ); + + tokio::task::spawn(async move { + if let Err(err) = conn.await { + println!("Connection failed: {:?}", err); + } + }); + + // Forward the request to the node + // No need to rewrite the URI because the request.uri() is relative + sender + .send_request(request.to_request()) + .await + .map_err(|e| SingleRequestError::SendRequestError(e.to_string()))? + } + // Otherwise send over clearnet + _ => { + let stream = TcpStream::connect(format!("{}:{}", node.1, node.2)) + .await + .map_err(|e| SingleRequestError::ConnectionError(e.to_string()))?; + + // Wrap with TLS if using HTTPS + let stream = maybe_wrap_with_tls(stream, &node.0, &node.1).await?; + + let (mut sender, conn) = hyper::client::conn::http1::handshake(TokioIo::new(stream)) + .await + .map_err(|e| SingleRequestError::ConnectionError(e.to_string()))?; + + tracing::info!( + "Connected to node via clearnet{}", + if node.0 == "https" { " with TLS" } else { "" } + ); + + tokio::task::spawn(async move { + if let Err(err) = conn.await { + println!("Connection failed: {:?}", err); + } + }); + + sender + .send_request(request.to_request()) + .await + .map_err(|e| SingleRequestError::SendRequestError(e.to_string()))? + } + }; + + // Convert hyper Response to axum Response + let (parts, body) = response.into_parts(); + let body_bytes = body + .collect() + .await + .map_err(|e| SingleRequestError::CollectResponseError(e.to_string()))? + .to_bytes(); + let axum_body = Body::from(body_bytes); + + let response = Response::from_parts(parts, axum_body); + + Ok(response) +} + +fn get_jsonrpc_error(body: &[u8]) -> Option { + // Try to parse as JSON + if let Ok(json) = serde_json::from_slice::(body) { + // Check if there's an "error" field + return json + .get("error") + .and_then(|e| e.as_str().map(|s| s.to_string())); + } + + // If we can't parse JSON, treat it as an error + None +} + +trait RequestDifferentiator { + /// Can this be request be proxied over clearnet? + fn clearnet_whitelisted(&self) -> bool; +} + +impl RequestDifferentiator for CloneableRequest { + fn clearnet_whitelisted(&self) -> bool { + match self.uri().to_string().as_str() { + // Downloading blocks does not reveal any metadata other + // than perhaps how far the wallet is behind or the restore + // height. + "/getblocks.bin" => true, + _ => false, + } + } +} + +/// A cloneable request that buffers the body in memory +#[derive(Clone)] +pub struct CloneableRequest { + parts: Parts, + pub body: Vec, +} + +/// A cloneable response that buffers the body in memory +#[derive(Clone)] +pub struct CloneableResponse { + parts: response::Parts, + body: Vec, +} + +impl CloneableRequest { + /// Convert a streaming request into a cloneable one by buffering the body + pub async fn from_request(request: Request) -> Result { + let (parts, body) = request.into_parts(); + let body_bytes = body.collect().await?.to_bytes().to_vec(); + + Ok(CloneableRequest { + parts, + body: body_bytes, + }) + } + + /// Convert back to a regular Request + pub fn into_request(self) -> Request { + Request::from_parts(self.parts, Body::from(self.body)) + } + + /// Get a new Request without consuming self + pub fn to_request(&self) -> Request { + Request::from_parts(self.parts.clone(), Body::from(self.body.clone())) + } + + /// Get the URI from the request + pub fn uri(&self) -> &axum::http::Uri { + &self.parts.uri + } + + /// Get the JSON-RPC method from the request body + pub fn jsonrpc_method(&self) -> Option { + static JSON_RPC_METHOD_KEY: &str = "method"; + + match serde_json::from_slice::(&self.body) { + Ok(json) => json + .get(JSON_RPC_METHOD_KEY) + .and_then(|m| m.as_str().map(|s| s.to_string())), + Err(_) => None, + } + } +} + +impl CloneableResponse { + /// Convert a streaming response into a cloneable one by buffering the body + pub async fn from_response(response: Response) -> Result { + let (parts, body) = response.into_parts(); + let body_bytes = body.collect().await?.to_bytes().to_vec(); + + Ok(CloneableResponse { + parts, + body: body_bytes, + }) + } + + /// Convert back to a regular Response + pub fn into_response(self) -> Response { + Response::from_parts(self.parts, Body::from(self.body)) + } + + /// Get a new Response without consuming self + pub fn to_response(&self) -> Response { + Response::from_parts(self.parts.clone(), Body::from(self.body.clone())) + } + + /// Get the status code + pub fn status(&self) -> StatusCode { + self.parts.status + } + + /// Check for JSON-RPC errors without consuming the response + pub fn get_jsonrpc_error(&self) -> Option { + get_jsonrpc_error(&self.body) + } +} + +impl HandlerError { + /// Convert HandlerError to an HTTP response + fn to_response(&self) -> Response { + let (status_code, error_message) = match self { + HandlerError::NoNodes => (StatusCode::SERVICE_UNAVAILABLE, "No nodes available"), + HandlerError::PoolError(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Pool error"), + HandlerError::PhyiscalError(_) => (StatusCode::BAD_GATEWAY, "Connection error"), + HandlerError::HttpError(status) => (*status, "HTTP error"), + HandlerError::JsonRpcError(_) => (StatusCode::BAD_GATEWAY, "JSON-RPC error"), + HandlerError::AllRequestsFailed(_) => (StatusCode::BAD_GATEWAY, "All requests failed"), + HandlerError::CloneRequestError(_) => ( + StatusCode::INTERNAL_SERVER_ERROR, + "Request processing error", + ), + }; + + let error_json = serde_json::json!({ + "error": { + "code": status_code.as_u16(), + "message": error_message, + "details": self.to_string() + } + }); + + Response::builder() + .status(status_code) + .header("content-type", "application/json") + .body(Body::from(error_json.to_string())) + .unwrap_or_else(|_| Response::new(Body::empty())) + } } #[derive(Debug, Clone)] enum HandlerError { NoNodes, PoolError(String), - RequestError(String), + PhyiscalError(SingleRequestError), + HttpError(axum::http::StatusCode), JsonRpcError(String), - AllRequestsFailed(Vec<(String, String)>), + AllRequestsFailed(Vec<((String, String, i64), HandlerError)>), + CloneRequestError(String), +} + +#[derive(Debug, Clone)] +enum SingleRequestError { + ConnectionError(String), + SendRequestError(String), + CollectResponseError(String), } impl std::fmt::Display for HandlerError { @@ -30,7 +473,7 @@ impl std::fmt::Display for HandlerError { match self { HandlerError::NoNodes => write!(f, "No nodes available"), HandlerError::PoolError(msg) => write!(f, "Pool error: {}", msg), - HandlerError::RequestError(msg) => write!(f, "Request error: {}", msg), + HandlerError::PhyiscalError(msg) => write!(f, "Request error: {}", msg), HandlerError::JsonRpcError(msg) => write!(f, "JSON-RPC error: {}", msg), HandlerError::AllRequestsFailed(errors) => { write!(f, "All requests failed: [")?; @@ -38,142 +481,31 @@ impl std::fmt::Display for HandlerError { if i > 0 { write!(f, ", ")?; } - write!(f, "{}: {}", node, error)?; + let node_str = display_node(node); + write!(f, "{}: {}", node_str, error)?; } write!(f, "]") } + HandlerError::CloneRequestError(msg) => write!(f, "Clone request error: {}", msg), + HandlerError::HttpError(msg) => write!(f, "HTTP error: {}", msg), } } } -fn is_jsonrpc_error(body: &[u8]) -> bool { - // Try to parse as JSON - if let Ok(json) = serde_json::from_slice::(body) { - // Check if there's an "error" field - return json.get("error").is_some(); - } - - // If we can't parse JSON, treat it as an error - true -} - -fn extract_jsonrpc_method(body: &[u8]) -> Option { - if let Ok(json) = serde_json::from_slice::(body) { - if let Some(method) = json.get("method").and_then(|m| m.as_str()) { - return Some(method.to_string()); - } - } - None -} - -async fn raw_http_request( - client: &ureq::Agent, - node_url: (String, String, i64), - path: &str, - method: &str, - headers: &HeaderMap, - body: Option<&[u8]>, -) -> Result { - let (scheme, host, port) = &node_url; - let url = format!("{}://{}:{}{}", scheme, host, port, path); - - // Clone the agent and convert borrowed data to owned for 'static lifetime - let client = client.clone(); - let method = method.to_string(); - let body = body.map(|b| b.to_vec()); - - // Clone headers we need (convert to owned data) - let headers_to_forward: Vec<(String, String)> = headers - .iter() - .filter_map(|(name, value)| { - let header_name = name.as_str(); - let header_name_lc = header_name.to_ascii_lowercase(); - - // Skip hop-by-hop headers and any body-related headers when we are **not** forwarding a body. - let is_hop_by_hop = matches!( - header_name_lc.as_str(), - "host" - | "connection" - | "transfer-encoding" - | "upgrade" - | "proxy-authenticate" - | "proxy-authorization" - | "te" - | "trailers" - ); - - // If we are not forwarding a body (e.g. GET request) then forwarding `content-length` or - // `content-type` with an absent body makes many Monero nodes hang waiting for bytes and - // eventually close the connection. This manifests as the time-outs we have observed. - let is_body_header_without_body = body.is_none() - && matches!(header_name_lc.as_str(), "content-length" | "content-type"); - - if !is_hop_by_hop && !is_body_header_without_body { - if let Ok(header_value) = std::str::from_utf8(value.as_bytes()) { - return Some((header_name.to_string(), header_value.to_string())); - } - } - None - }) - .collect(); - - tokio::task::spawn_blocking(move || { - let mut request = client.request(&method, &url); - - // Forward essential headers (already filtered and converted to owned data) - for (header_name, header_value) in headers_to_forward { - request = request.set(&header_name, &header_value); - } - - // Execute the request with optional body - let response = if let Some(body_bytes) = body { - request.send_bytes(&body_bytes) - } else { - request.call() - }; - - let response = response.map_err(|e| HandlerError::RequestError(format!("{:#?}", e)))?; - - // Extract status and headers before consuming the response - let status = response.status(); - let header_names: Vec = response.headers_names(); - let headers: Vec<(String, String)> = header_names - .iter() - .filter_map(|name| { - response - .header(name) - .map(|value| (name.clone(), value.to_string())) - }) - .collect(); - - let mut body_bytes = Vec::new(); - response - .into_reader() - .read_to_end(&mut body_bytes) - .map_err(|e| { - HandlerError::RequestError(format!("Failed to read response body: {:#?}", e)) - })?; - - let mut axum_response = Response::new(Body::from(body_bytes)); - *axum_response.status_mut() = - StatusCode::from_u16(status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); - - // Copy response headers exactly - for (name, value) in headers { - if let (Ok(header_name), Ok(header_value)) = ( - axum::http::HeaderName::try_from(name.as_str()), - axum::http::HeaderValue::try_from(value.as_bytes()), - ) { - axum_response - .headers_mut() - .insert(header_name, header_value); +impl std::fmt::Display for SingleRequestError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + SingleRequestError::ConnectionError(msg) => write!(f, "Connection error: {}", msg), + SingleRequestError::SendRequestError(msg) => write!(f, "Send request error: {}", msg), + SingleRequestError::CollectResponseError(msg) => { + write!(f, "Collect response error: {}", msg) } } + } +} - Ok(axum_response) - }) - .await - .map_err(|e| HandlerError::RequestError(format!("Task join error: {}", e)))? +fn display_node(node: &(String, String, i64)) -> String { + format!("{}://{}:{}", node.0, node.1, node.2) } async fn record_success(state: &AppState, scheme: &str, host: &str, port: i64, latency_ms: f64) { @@ -198,356 +530,6 @@ async fn record_failure(state: &AppState, scheme: &str, host: &str, port: i64) { } } -async fn single_raw_request( - client: &ureq::Agent, - node_url: (String, String, i64), - path: &str, - method: &str, - headers: &HeaderMap, - body: Option<&[u8]>, -) -> Result<(Response, (String, String, i64), f64), HandlerError> { - let start_time = Instant::now(); - - match raw_http_request(client, node_url.clone(), path, method, headers, body).await { - Ok(response) => { - let elapsed = start_time.elapsed(); - let latency_ms = elapsed.as_millis() as f64; - - // Check HTTP status code - only 200 is success! - if response.status().is_success() { - // For JSON-RPC endpoints, also check for JSON-RPC errors - if path == "/json_rpc" { - let (parts, body_stream) = response.into_parts(); - let body_bytes = axum::body::to_bytes(body_stream, usize::MAX) - .await - .map_err(|e| HandlerError::RequestError(format!("{:#?}", e)))?; - - if is_jsonrpc_error(&body_bytes) { - return Err(HandlerError::JsonRpcError("JSON-RPC error".to_string())); - } - - // Reconstruct response with the body we consumed - let response = Response::from_parts(parts, Body::from(body_bytes)); - Ok((response, node_url, latency_ms)) - } else { - // For non-JSON-RPC endpoints, HTTP success is enough - Ok((response, node_url, latency_ms)) - } - } else { - // Non-200 status codes are failures - Err(HandlerError::RequestError(format!( - "HTTP {}", - response.status() - ))) - } - } - Err(e) => Err(e), - } -} - -async fn sequential_requests( - state: &AppState, - path: &str, - method: &str, - headers: &HeaderMap, - body: Option<&[u8]>, -) -> Result { - const POOL_SIZE: usize = 20; - const MAX_JSONRPC_ERRORS: usize = 2; - - // Extract JSON-RPC method for better logging - let jsonrpc_method = if path == "/json_rpc" { - if let Some(body_data) = body { - extract_jsonrpc_method(body_data) - } else { - None - } - } else { - None - }; - - let mut tried_nodes = 0; - let mut collected_errors: Vec<((String, String, i64), HandlerError)> = Vec::new(); - - // Get the pool of nodes - let available_pool = { - let nodes = state - .node_pool - .get_top_reliable_nodes(POOL_SIZE) - .await - .map_err(|e| HandlerError::PoolError(e.to_string()))?; - - let pool: Vec<(String, String, i64)> = nodes - .into_iter() - .map(|node| (node.scheme, node.host, node.port as i64)) - .collect(); - - pool - }; - - if available_pool.is_empty() { - return Err(HandlerError::NoNodes); - } - - // Try nodes one by one sequentially - for node in available_pool.iter().take(POOL_SIZE) { - tried_nodes += 1; - let node_display = format!("{}://{}:{}", node.0, node.1, node.2); - - match &jsonrpc_method { - Some(rpc_method) => debug!( - "Trying {} request to {} (JSON-RPC: {}) - attempt {} of {}", - method, - node_display, - rpc_method, - tried_nodes, - available_pool.len().min(POOL_SIZE) - ), - None => debug!( - "Trying {} request to {} - attempt {} of {}", - method, - node_display, - tried_nodes, - available_pool.len().min(POOL_SIZE) - ), - } - - match single_raw_request( - &state.http_client, - node.clone(), - path, - method, - headers, - body, - ) - .await - { - Ok((response, winning_node, latency_ms)) => { - let (scheme, host, port) = &winning_node; - let winning_node_display = format!("{}://{}:{}", scheme, host, port); - - match &jsonrpc_method { - Some(rpc_method) => debug!( - "{} response from {} ({}ms) - SUCCESS after trying {} nodes! JSON-RPC: {}", - method, winning_node_display, latency_ms, tried_nodes, rpc_method - ), - None => debug!( - "{} response from {} ({}ms) - SUCCESS after trying {} nodes!", - method, winning_node_display, latency_ms, tried_nodes - ), - } - - record_success(state, &node.0, &node.1, node.2, latency_ms).await; - - return Ok(response); - } - Err(e) => { - collected_errors.push((node.clone(), e.clone())); - - debug!( - "Request failed with node {}: {} - checking if we should fail fast...", - node_display, e - ); - - // Count JSON-RPC errors by checking through all collected errors (type-safe) - let jsonrpc_error_count = collected_errors - .iter() - .filter(|(_, error)| matches!(error, HandlerError::JsonRpcError(_))) - .count(); - - // Fail fast after MAX_JSONRPC_ERRORS JSON-RPC errors - if jsonrpc_error_count >= MAX_JSONRPC_ERRORS { - match &jsonrpc_method { - Some(rpc_method) => error!( - "Failing fast after {} JSON-RPC errors for {} request (JSON-RPC: {}). These are likely request-specific issues that won't resolve on other servers.", - jsonrpc_error_count, method, rpc_method - ), - None => error!( - "Failing fast after {} JSON-RPC errors for {} request. These are likely request-specific issues that won't resolve on other servers.", - jsonrpc_error_count, method - ), - } - - // Record all non-JSON-RPC errors as failures - for (node, error) in collected_errors.iter() { - if !matches!(error, HandlerError::JsonRpcError(_)) { - record_failure(state, &node.0, &node.1, node.2).await; - } - } - - return Err(HandlerError::AllRequestsFailed( - collected_errors - .into_iter() - .map(|(node, error)| (display_node(&node), error.to_string())) - .collect(), - )); - } - - continue; - } - } - } - - // Record failures for all nodes that were tried - for (node, _) in collected_errors.iter() { - record_failure(state, &node.0, &node.1, node.2).await; - } - - // Log detailed error information - let detailed_errors: Vec = collected_errors - .iter() - .map(|(node, error)| format!("{}: {}", display_node(node), error)) - .collect(); - - match &jsonrpc_method { - Some(rpc_method) => error!( - "All {} requests failed after trying {} nodes (JSON-RPC: {}). Detailed errors:\n{}", - method, - tried_nodes, - rpc_method, - detailed_errors.join("\n") - ), - None => error!( - "All {} requests failed after trying {} nodes. Detailed errors:\n{}", - method, - tried_nodes, - detailed_errors.join("\n") - ), - } - - Err(HandlerError::AllRequestsFailed( - collected_errors - .into_iter() - .map(|(node, error)| (display_node(&node), error.to_string())) - .collect(), - )) -} - -/// Forward a request to the node pool, returning either a successful response or a simple -/// `500` with text "All nodes failed". Keeps the error handling logic in one place so the -/// public handlers stay readable. -async fn proxy_request( - state: &AppState, - path: &str, - method: &str, - headers: &HeaderMap, - body: Option<&[u8]>, -) -> Response { - match sequential_requests(state, path, method, headers, body).await { - Ok(res) => res, - Err(handler_error) => { - let error_response = match &handler_error { - HandlerError::AllRequestsFailed(node_errors) => { - json!({ - "error": "All nodes failed", - "details": { - "type": "AllRequestsFailed", - "message": "All proxy requests to available nodes failed", - "node_errors": node_errors.iter().map(|(node, error)| { - json!({ - "node": node, - "error": error - }) - }).collect::>(), - "total_nodes_tried": node_errors.len() - } - }) - } - HandlerError::NoNodes => { - json!({ - "error": "No nodes available", - "details": { - "type": "NoNodes", - "message": "No healthy nodes available in the pool" - } - }) - } - HandlerError::PoolError(msg) => { - json!({ - "error": "Pool error", - "details": { - "type": "PoolError", - "message": msg - } - }) - } - HandlerError::RequestError(msg) => { - json!({ - "error": "Request error", - "details": { - "type": "RequestError", - "message": msg - } - }) - } - HandlerError::JsonRpcError(msg) => { - json!({ - "error": "JSON-RPC error", - "details": { - "type": "JsonRpcError", - "message": msg - } - }) - } - }; - - Response::builder() - .status(StatusCode::INTERNAL_SERVER_ERROR) - .header("content-type", "application/json") - .body(Body::from(error_response.to_string())) - .unwrap_or_else(|_| Response::new(Body::empty())) - } - } -} - -#[axum::debug_handler] -pub async fn proxy_handler( - State(state): State, - method: Method, - uri: axum::http::Uri, - headers: HeaderMap, - body: axum::body::Bytes, -) -> Response { - let body_size = body.len(); - let request_id = Uuid::new_v4(); - let path = uri.path().to_string(); - let method_str = method.to_string(); - let path_clone = path.clone(); - - // Extract JSON-RPC method for tracing span - let body_option = (!body.is_empty()).then_some(&body[..]); - let jsonrpc_method = if path == "/json_rpc" { - if let Some(body_data) = body_option { - extract_jsonrpc_method(body_data) - } else { - None - } - } else { - None - }; - let jsonrpc_method_for_span = jsonrpc_method.as_deref().unwrap_or("N/A").to_string(); - - async move { - match &jsonrpc_method { - Some(rpc_method) => debug!( - "Proxying {} {} ({} bytes) - JSON-RPC method: {}", - method, path, body_size, rpc_method - ), - None => debug!("Proxying {} {} ({} bytes)", method, path, body_size), - } - - proxy_request(&state, &path, method.as_str(), &headers, body_option).await - } - .instrument(info_span!("proxy_request", - request_id = %request_id, - method = %method_str, - path = %path_clone, - body_size = body_size, - jsonrpc_method = %jsonrpc_method_for_span - )) - .await -} - #[axum::debug_handler] pub async fn stats_handler(State(state): State) -> Response { async move { @@ -559,7 +541,8 @@ pub async fn stats_handler(State(state): State) -> Response { "healthy_node_count": status.healthy_node_count, "successful_health_checks": status.successful_health_checks, "unsuccessful_health_checks": status.unsuccessful_health_checks, - "top_reliable_nodes": status.top_reliable_nodes + "top_reliable_nodes": status.top_reliable_nodes, + "bandwidth_kb_per_sec": status.bandwidth_kb_per_sec }); Response::builder() diff --git a/src-gui/src/renderer/components/pages/help/MoneroPoolHealthBox.tsx b/src-gui/src/renderer/components/pages/help/MoneroPoolHealthBox.tsx index 8e865978..3380d1f3 100644 --- a/src-gui/src/renderer/components/pages/help/MoneroPoolHealthBox.tsx +++ b/src-gui/src/renderer/components/pages/help/MoneroPoolHealthBox.tsx @@ -75,6 +75,12 @@ export default function MoneroPoolHealthBox() { variant="outlined" size="small" /> + 10 ? "info" : "default"} + variant="outlined" + size="small" + /> ); }; diff --git a/src-gui/src/renderer/components/pages/help/SettingsBox.tsx b/src-gui/src/renderer/components/pages/help/SettingsBox.tsx index 7c3b6190..230bf5d7 100644 --- a/src-gui/src/renderer/components/pages/help/SettingsBox.tsx +++ b/src-gui/src/renderer/components/pages/help/SettingsBox.tsx @@ -38,6 +38,7 @@ import { setFiatCurrency, setTheme, setTorEnabled, + setEnableMoneroTor, setUseMoneroRpcPool, setDonateToDevelopment, } from "store/features/settingsSlice"; @@ -91,6 +92,7 @@ export default function SettingsBox() { + @@ -715,6 +717,42 @@ export function TorSettings() { ); } +/** + * A setting that allows you to enable or disable routing Monero wallet traffic through Tor. + * This setting is only visible when Tor is enabled. + */ +function MoneroTorSettings() { + const dispatch = useAppDispatch(); + const torEnabled = useSettings((settings) => settings.enableTor); + const enableMoneroTor = useSettings((settings) => settings.enableMoneroTor); + + const handleChange = (event: React.ChangeEvent) => + dispatch(setEnableMoneroTor(event.target.checked)); + + // Hide this setting if Tor is disabled entirely + if (!torEnabled) { + return null; + } + + return ( + + + + + + + + + ); +} + /** * A setting that allows you to manage rendezvous points for maker discovery */ diff --git a/src-gui/src/renderer/components/pages/monero/components/WalletOverview.tsx b/src-gui/src/renderer/components/pages/monero/components/WalletOverview.tsx index 1f385b9e..9bd4f4cb 100644 --- a/src-gui/src/renderer/components/pages/monero/components/WalletOverview.tsx +++ b/src-gui/src/renderer/components/pages/monero/components/WalletOverview.tsx @@ -1,14 +1,5 @@ -import { - Box, - Typography, - CircularProgress, - Button, - Card, - CardContent, - Divider, - CardHeader, - LinearProgress, -} from "@mui/material"; +import { Box, Typography, Card, LinearProgress } from "@mui/material"; +import { useAppSelector } from "store/hooks"; import { PiconeroAmount } from "../../../other/Units"; import { FiatPiconeroAmount } from "../../../other/Units"; import StateIndicator from "./StateIndicator"; @@ -30,23 +21,77 @@ export default function WalletOverview({ balance, syncProgress, }: WalletOverviewProps) { + const lowestCurrentBlock = useAppSelector( + (state) => state.wallet.state.lowestCurrentBlock, + ); + + const poolStatus = useAppSelector((state) => state.pool.status); + const pendingBalance = parseFloat(balance.total_balance) - parseFloat(balance.unlocked_balance); const isSyncing = syncProgress && syncProgress.progress_percentage < 100; const blocksLeft = syncProgress?.target_block - syncProgress?.current_block; + + // Treat blocksLeft = 1 as if we have no direct knowledge + const hasDirectKnowledge = blocksLeft != null && blocksLeft > 1; + + // syncProgress.progress_percentage is not good to display + // assuming we have an old wallet, eventually we will always only use the last few cm of the progress bar + // + // We calculate our own progress percentage + // lowestCurrentBlock is the lowest block we have seen + // currentBlock is the current block we are on (how war we've synced) + // targetBlock is the target block we are syncing to + // + // The progressPercentage below is the progress on that path + // If the lowestCurrentBlock is null, we fallback to the syncProgress.progress_percentage + const progressPercentage = + lowestCurrentBlock === null || !syncProgress + ? syncProgress?.progress_percentage || 0 + : syncProgress.target_block === lowestCurrentBlock + ? 100 // Fully synced when target equals lowest current block + : Math.max( + 0, + Math.min( + 100, + ((syncProgress.current_block - lowestCurrentBlock) / + (syncProgress.target_block - lowestCurrentBlock)) * + 100, + ), + ); + + const isStuck = poolStatus?.bandwidth_kb_per_sec != null && poolStatus.bandwidth_kb_per_sec < 0.01; + + // Calculate estimated time remaining for sync + const formatTimeRemaining = (seconds: number): string => { + if (seconds < 60) return `${Math.round(seconds)}s`; + if (seconds < 3600) return `${Math.round(seconds / 60)}m`; + if (seconds < 86400) return `${Math.round(seconds / 3600)}h`; + return `${Math.round(seconds / 86400)}d`; + }; + + const estimatedTimeRemaining = + hasDirectKnowledge && poolStatus?.bandwidth_kb_per_sec != null && poolStatus.bandwidth_kb_per_sec > 0 + ? (blocksLeft * 130) / poolStatus.bandwidth_kb_per_sec // blocks * 130kb / kb_per_sec = seconds + : null; return ( {syncProgress && syncProgress.progress_percentage < 100 && ( )} @@ -54,95 +99,105 @@ export default function WalletOverview({ {/* Balance */} - - Available Funds - - - - - - - - {pendingBalance > 0 && ( - <> - - Pending - - - - - - - - - - )} - + {/* Left side content */} - - {isSyncing ? "syncing" : "synced"} + + Available Funds + + + - - - {isSyncing && ( - {blocksLeft.toLocaleString()} blocks left + + + {pendingBalance > 0 && ( + + + Pending + + + + + + + + )} + + {/* Right side - simple approach */} + + + + {isSyncing && hasDirectKnowledge && ( + + {blocksLeft?.toLocaleString()} blocks left + + )} + {poolStatus && isSyncing && !isStuck && ( + <> + + {estimatedTimeRemaining && !isStuck && ( + <>{formatTimeRemaining(estimatedTimeRemaining)} left + )} / {poolStatus.bandwidth_kb_per_sec?.toFixed(1) ?? '0.0'} KB/s + + + )} + + ); diff --git a/src-gui/src/renderer/components/pages/monero/components/WalletPageLoadingState.tsx b/src-gui/src/renderer/components/pages/monero/components/WalletPageLoadingState.tsx index decdf3d6..9c7fe440 100644 --- a/src-gui/src/renderer/components/pages/monero/components/WalletPageLoadingState.tsx +++ b/src-gui/src/renderer/components/pages/monero/components/WalletPageLoadingState.tsx @@ -21,30 +21,29 @@ export default function WalletPageLoadingState() { {/* Balance */} - - Available Funds - - - - - - - + + Available Funds + + + + + + + + - loading diff --git a/src-gui/src/renderer/rpc.ts b/src-gui/src/renderer/rpc.ts index ae1cbde9..a8339e9b 100644 --- a/src-gui/src/renderer/rpc.ts +++ b/src-gui/src/renderer/rpc.ts @@ -319,6 +319,8 @@ export async function initializeContext() { // For Monero nodes, determine whether to use pool or custom node const useMoneroRpcPool = store.getState().settings.useMoneroRpcPool; + const useMoneroTor = store.getState().settings.enableMoneroTor; + const moneroNodeUrl = store.getState().settings.nodes[network][Blockchain.Monero][0] ?? null; @@ -341,6 +343,7 @@ export async function initializeContext() { electrum_rpc_urls: bitcoinNodes, monero_node_config: moneroNodeConfig, use_tor: useTor, + enable_monero_tor: useMoneroTor, }; logger.info("Initializing context with settings", tauriSettings); diff --git a/src-gui/src/store/features/settingsSlice.ts b/src-gui/src/store/features/settingsSlice.ts index b17acef0..6875f11e 100644 --- a/src-gui/src/store/features/settingsSlice.ts +++ b/src-gui/src/store/features/settingsSlice.ts @@ -19,6 +19,8 @@ export interface SettingsState { fiatCurrency: FiatCurrency; /// Whether to enable Tor for p2p connections enableTor: boolean; + /// Whether to route Monero wallet traffic through Tor + enableMoneroTor: boolean; /// Whether to use the Monero RPC pool for load balancing (true) or custom nodes (false) useMoneroRpcPool: boolean; userHasSeenIntroduction: boolean; @@ -126,6 +128,7 @@ const initialState: SettingsState = { fetchFiatPrices: false, fiatCurrency: FiatCurrency.Usd, enableTor: true, + enableMoneroTor: false, // Default to not routing Monero traffic through Tor useMoneroRpcPool: true, // Default to using RPC pool userHasSeenIntroduction: false, rendezvousPoints: DEFAULT_RENDEZVOUS_POINTS, @@ -215,6 +218,9 @@ const alertsSlice = createSlice({ setTorEnabled(slice, action: PayloadAction) { slice.enableTor = action.payload; }, + setEnableMoneroTor(slice, action: PayloadAction) { + slice.enableMoneroTor = action.payload; + }, setUseMoneroRpcPool(slice, action: PayloadAction) { slice.useMoneroRpcPool = action.payload; }, @@ -236,6 +242,7 @@ export const { setFetchFiatPrices, setFiatCurrency, setTorEnabled, + setEnableMoneroTor, setUseMoneroRpcPool, setUserHasSeenIntroduction, addRendezvousPoint, diff --git a/src-gui/src/store/features/walletSlice.ts b/src-gui/src/store/features/walletSlice.ts index 7f2949b8..71178b73 100644 --- a/src-gui/src/store/features/walletSlice.ts +++ b/src-gui/src/store/features/walletSlice.ts @@ -11,6 +11,7 @@ interface WalletState { balance: GetMoneroBalanceResponse | null; syncProgress: GetMoneroSyncProgressResponse | null; history: GetMoneroHistoryResponse | null; + lowestCurrentBlock: number | null; } export interface WalletSlice { @@ -24,6 +25,7 @@ const initialState: WalletSlice = { balance: null, syncProgress: null, history: null, + lowestCurrentBlock: null, }, }; @@ -42,6 +44,16 @@ export const walletSlice = createSlice({ slice, action: PayloadAction, ) { + slice.state.lowestCurrentBlock = Math.min( + // We ignore anything below 10 blocks as this may be something like wallet2 + // sending a wrong value when it hasn't initialized yet + slice.state.lowestCurrentBlock < 10 || + slice.state.lowestCurrentBlock === null + ? Infinity + : slice.state.lowestCurrentBlock, + action.payload.current_block, + ); + slice.state.syncProgress = action.payload; }, setHistory(slice, action: PayloadAction) { diff --git a/src-tauri/src/lib.rs b/src-tauri/src/lib.rs index 075f93e0..4cd8c394 100644 --- a/src-tauri/src/lib.rs +++ b/src-tauri/src/lib.rs @@ -440,6 +440,7 @@ async fn initialize_context( .with_json(false) .with_debug(true) .with_tor(settings.use_tor) + .with_enable_monero_tor(settings.enable_monero_tor) .with_tauri(tauri_handle.clone()) .build() .await; diff --git a/swap/src/bin/asb.rs b/swap/src/bin/asb.rs index d71d823c..8a965c9c 100644 --- a/swap/src/bin/asb.rs +++ b/swap/src/bin/asb.rs @@ -25,7 +25,7 @@ use structopt::clap; use structopt::clap::ErrorKind; use swap::asb::command::{parse_args, Arguments, Command}; use swap::asb::{cancel, punish, redeem, refund, safely_abort, EventLoop, Finality, KrakenRate}; -use swap::common::tor::init_tor_client; +use swap::common::tor::{bootstrap_tor_client, create_tor_client}; use swap::common::tracing_util::Format; use swap::common::{self, get_logs, warn_if_outdated}; use swap::database::{open_db, AccessMode}; @@ -201,8 +201,10 @@ pub async fn main() -> Result<()> { let kraken_rate = KrakenRate::new(config.maker.ask_spread, kraken_price_updates); let namespace = XmrBtcNamespace::from_is_testnet(testnet); - // Initialize Tor client - let tor_client = init_tor_client(&config.data.dir, None).await?.into(); + // Initialize and bootstrap Tor client + let tor_client = create_tor_client(&config.data.dir).await?; + bootstrap_tor_client(tor_client.clone(), None).await?; + let tor_client = tor_client.into(); let (mut swarm, onion_addresses) = swarm::asb( &seed, @@ -495,10 +497,7 @@ async fn init_monero_wallet( let (server_info, _status_receiver, _pool_handle) = monero_rpc_pool::start_server_with_random_port( - monero_rpc_pool::config::Config::new_random_port( - "127.0.0.1".to_string(), - config.data.dir.join("monero-rpc-pool"), - ), + monero_rpc_pool::config::Config::new_random_port(config.data.dir.join("monero-rpc-pool")), env_config.monero_network, ) .await diff --git a/swap/src/cli/api.rs b/swap/src/cli/api.rs index 29323be9..174d0173 100644 --- a/swap/src/cli/api.rs +++ b/swap/src/cli/api.rs @@ -3,7 +3,7 @@ pub mod tauri_bindings; use crate::cli::api::tauri_bindings::SeedChoice; use crate::cli::command::{Bitcoin, Monero}; -use crate::common::tor::init_tor_client; +use crate::common::tor::{bootstrap_tor_client, create_tor_client}; use crate::common::tracing_util::Format; use crate::database::{open_db, AccessMode}; use crate::network::rendezvous::XmrBtcNamespace; @@ -204,6 +204,7 @@ pub struct ContextBuilder { debug: bool, json: bool, tor: bool, + enable_monero_tor: bool, tauri_handle: Option, } @@ -227,6 +228,7 @@ impl ContextBuilder { debug: false, json: false, tor: false, + enable_monero_tor: false, tauri_handle: None, } } @@ -280,6 +282,12 @@ impl ContextBuilder { self } + /// Whether to route Monero wallet traffic through Tor (default false) + pub fn with_enable_monero_tor(mut self, enable_monero_tor: bool) -> Self { + self.enable_monero_tor = enable_monero_tor; + self + } + /// Takes the builder, initializes the context by initializing the wallets and other components and returns the Context. pub async fn build(self) -> Result { // This is the data directory for the eigenwallet (wallet files) @@ -314,12 +322,29 @@ impl ContextBuilder { ); }); - // Start the rpc pool for the monero wallet + // Create unbootstrapped Tor client early if enabled + let unbootstrapped_tor_client = if self.tor { + match create_tor_client(&base_data_dir).await.inspect_err(|err| { + tracing::warn!(%err, "Failed to create Tor client. We will continue without Tor"); + }) { + Ok(client) => Some(client), + Err(_) => None, + } + } else { + tracing::warn!("Internal Tor client not enabled, skipping initialization"); + None + }; + + // Start the rpc pool for the monero wallet with optional Tor client based on enable_monero_tor setting let (server_info, mut status_receiver, pool_handle) = monero_rpc_pool::start_server_with_random_port( - monero_rpc_pool::config::Config::new_random_port( - "127.0.0.1".to_string(), + monero_rpc_pool::config::Config::new_random_port_with_tor_client( base_data_dir.join("monero-rpc-pool"), + if self.enable_monero_tor { + unbootstrapped_tor_client.clone() + } else { + None + }, ), match self.is_testnet { true => crate::monero::Network::Stagenet, @@ -460,25 +485,25 @@ impl ContextBuilder { } }; - let initialize_tor_client = async { - // Don't init a tor client unless we should use it. - if !self.tor { - tracing::warn!("Internal Tor client not enabled, skipping initialization"); - return Ok(None); + let bootstrap_tor_client_task = async { + // Bootstrap the Tor client if we have one + match unbootstrapped_tor_client.clone() { + Some(tor_client) => { + bootstrap_tor_client(tor_client.clone(), tauri_handle.clone()) + .await + .inspect_err(|err| { + tracing::warn!(%err, "Failed to bootstrap Tor client. It will remain unbootstrapped"); + }) + .ok(); + + Ok(Some(tor_client)) + } + None => Ok(None), } - - let maybe_tor_client = init_tor_client(&data_dir, tauri_handle.clone()) - .await - .inspect_err(|err| { - tracing::warn!(%err, "Failed to create Tor client. We will continue without Tor"); - }) - .ok(); - - Ok(maybe_tor_client) }; let (bitcoin_wallet, tor) = - tokio::try_join!(initialize_bitcoin_wallet, initialize_tor_client,)?; + tokio::try_join!(initialize_bitcoin_wallet, bootstrap_tor_client_task,)?; // If we have a bitcoin wallet and a tauri handle, we start a background task if let Some(wallet) = bitcoin_wallet.clone() { diff --git a/swap/src/cli/api/tauri_bindings.rs b/swap/src/cli/api/tauri_bindings.rs index 62010ea0..e2380bca 100644 --- a/swap/src/cli/api/tauri_bindings.rs +++ b/swap/src/cli/api/tauri_bindings.rs @@ -1008,6 +1008,8 @@ pub struct TauriSettings { pub electrum_rpc_urls: Vec, /// Whether to initialize and use a tor client. pub use_tor: bool, + /// Whether to route Monero wallet traffic through Tor + pub enable_monero_tor: bool, } #[typeshare] diff --git a/swap/src/common/tor.rs b/swap/src/common/tor.rs index 57e19b62..7b61b351 100644 --- a/swap/src/common/tor.rs +++ b/swap/src/common/tor.rs @@ -8,9 +8,9 @@ use arti_client::{config::TorClientConfigBuilder, status::BootstrapStatus, Error use futures::StreamExt; use tor_rtcompat::tokio::TokioRustlsRuntime; -pub async fn init_tor_client( +/// Creates an unbootstrapped Tor client +pub async fn create_tor_client( data_dir: &Path, - tauri_handle: Option, ) -> Result>, Error> { // We store the Tor state in the data directory let data_dir = data_dir.join("tor"); @@ -23,20 +23,28 @@ pub async fn init_tor_client( .build() .expect("We initialized the Tor client all required attributes"); - // Start the Arti client, and let it bootstrap a connection to the Tor network. - // (This takes a while to gather the necessary directory information. - // It uses cached information when possible.) + // Create the Arti client without bootstrapping let runtime = TokioRustlsRuntime::current().expect("We are always running with tokio"); - tracing::debug!("Bootstrapping Tor client"); + tracing::debug!("Creating unbootstrapped Tor client"); let tor_client = TorClient::with_runtime(runtime) .config(config) .create_unbootstrapped_async() .await?; + Ok(Arc::new(tor_client)) +} + +/// Bootstraps an existing Tor client +pub async fn bootstrap_tor_client( + tor_client: Arc>, + tauri_handle: Option, +) -> Result<(), Error> { let mut bootstrap_events = tor_client.bootstrap_events(); + tracing::debug!("Bootstrapping Tor client"); + // Create a background progress handle for the Tor bootstrap process // The handle manages the TauriHandle internally, so we don't need to worry about it anymore let progress_handle = @@ -67,7 +75,7 @@ pub async fn init_tor_client( }, }?; - Ok(Arc::new(tor_client)) + Ok(()) } // A trait to convert the Tor bootstrap event into a TauriBootstrapStatus