diff --git a/CHANGELOG.md b/CHANGELOG.md index 69835e09..e6bcae6c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - GUI: Speedup startup by concurrently bootstrapping Tor and requesting the user to select a wallet - GUI: Add white background to QR code modal to make it better scannable - GUI + CLI + ASB: Add `/dns4/rendezvous.observer/tcp/8888/p2p/12D3KooWMjceGXrYuGuDMGrfmJxALnSDbK4km6s1i1sJEgDTgGQa` to the default list of rendezvous points +- GUI + CLI + ASB: Monero RPC pool now prioritizes nodes with pre-established TCP connections ## [3.0.0-beta.6] - 2025-08-07 diff --git a/monero-rpc-pool/.sqlx/query-44ddff5bdf5b56e9c1a9848641181de4441c8974b2d1304804874cf620420ad4.json b/monero-rpc-pool/.sqlx/query-44ddff5bdf5b56e9c1a9848641181de4441c8974b2d1304804874cf620420ad4.json deleted file mode 100644 index 42863654..00000000 --- a/monero-rpc-pool/.sqlx/query-44ddff5bdf5b56e9c1a9848641181de4441c8974b2d1304804874cf620420ad4.json +++ /dev/null @@ -1,28 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n SELECT \n n.scheme,\n n.host,\n n.port\n FROM monero_nodes n\n LEFT JOIN (\n SELECT \n node_id,\n SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,\n SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count\n FROM (\n SELECT node_id, was_successful\n FROM health_checks \n ORDER BY timestamp DESC \n LIMIT 1000\n ) recent_checks\n GROUP BY node_id\n ) stats ON n.id = stats.node_id\n WHERE n.network = ?\n ORDER BY \n CASE \n WHEN (COALESCE(stats.success_count, 0) + COALESCE(stats.failure_count, 0)) > 0 \n THEN CAST(COALESCE(stats.success_count, 0) AS REAL) / CAST(COALESCE(stats.success_count, 0) + COALESCE(stats.failure_count, 0) AS REAL)\n ELSE 0.0 \n END DESC\n LIMIT ?\n ", - "describe": { - "columns": [ - { - "name": "scheme", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "host", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "port", - "ordinal": 2, - "type_info": "Integer" - } - ], - "parameters": { - "Right": 2 - }, - "nullable": [false, false, false] - }, - "hash": "44ddff5bdf5b56e9c1a9848641181de4441c8974b2d1304804874cf620420ad4" -} diff --git a/monero-rpc-pool/.sqlx/query-4ce7c42906ba69e0c8e1c0dad952956edd582a0edecd45710e22dcb28785eeab.json b/monero-rpc-pool/.sqlx/query-4ce7c42906ba69e0c8e1c0dad952956edd582a0edecd45710e22dcb28785eeab.json new file mode 100644 index 00000000..c35d06f7 --- /dev/null +++ b/monero-rpc-pool/.sqlx/query-4ce7c42906ba69e0c8e1c0dad952956edd582a0edecd45710e22dcb28785eeab.json @@ -0,0 +1,28 @@ +{ + "db_name": "SQLite", + "query": "\n WITH scored AS (\n SELECT \n n.scheme,\n n.host,\n n.port,\n CASE \n WHEN (COALESCE(stats.success_count, 0) + COALESCE(stats.failure_count, 0)) > 0 \n THEN CAST(COALESCE(stats.success_count, 0) AS REAL) / CAST(COALESCE(stats.success_count, 0) + COALESCE(stats.failure_count, 0) AS REAL)\n ELSE 0.0 \n END as base_score,\n MAX(\n ABS(RANDOM()) / CAST(0x7fffffffffffffff AS REAL),\n ABS(RANDOM()) / CAST(0x7fffffffffffffff AS REAL),\n ABS(RANDOM()) / CAST(0x7fffffffffffffff AS REAL)\n ) as r\n FROM monero_nodes n\n LEFT JOIN (\n SELECT \n node_id,\n SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,\n SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count\n FROM (\n SELECT node_id, was_successful\n FROM health_checks \n ORDER BY timestamp DESC \n LIMIT 1000\n ) recent_checks\n GROUP BY node_id\n ) stats ON n.id = stats.node_id\n WHERE n.network = ?\n )\n SELECT scheme, host, port\n FROM scored\n ORDER BY (base_score * r) DESC, r DESC\n LIMIT ?\n ", + "describe": { + "columns": [ + { + "name": "scheme", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "host", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "port", + "ordinal": 2, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 2 + }, + "nullable": [false, false, false] + }, + "hash": "4ce7c42906ba69e0c8e1c0dad952956edd582a0edecd45710e22dcb28785eeab" +} diff --git a/monero-rpc-pool/src/bin/stress_test_downloader.rs b/monero-rpc-pool/src/bin/stress_test_downloader.rs index 7a2b0436..b00a7b1a 100644 --- a/monero-rpc-pool/src/bin/stress_test_downloader.rs +++ b/monero-rpc-pool/src/bin/stress_test_downloader.rs @@ -143,7 +143,7 @@ async fn main() -> Result<(), Box> { let client = Arc::new( reqwest::Client::builder() - .timeout(std::time::Duration::from_secs(3 * 60 + 30)) // used in wallet2 + .timeout(std::time::Duration::from_secs(10 * 60 + 30)) // used in wallet2 .build() .expect("Failed to build reqwest client"), ); diff --git a/monero-rpc-pool/src/connection_pool.rs b/monero-rpc-pool/src/connection_pool.rs index 8013afdc..348b450b 100644 --- a/monero-rpc-pool/src/connection_pool.rs +++ b/monero-rpc-pool/src/connection_pool.rs @@ -175,4 +175,18 @@ impl ConnectionPool { } } } + + /// Check if there's an available (unlocked) connection for the given key. + pub async fn has_available_connection(&self, key: &StreamKey) -> bool { + let map = self.inner.read().await; + if let Some(vec_lock) = map.get(key) { + let vec = vec_lock.read().await; + for sender_mutex in vec.iter() { + if sender_mutex.try_lock().is_ok() { + return true; + } + } + } + false + } } diff --git a/monero-rpc-pool/src/database.rs b/monero-rpc-pool/src/database.rs index 8843a966..55a75c61 100644 --- a/monero-rpc-pool/src/database.rs +++ b/monero-rpc-pool/src/database.rs @@ -232,38 +232,52 @@ impl Database { } /// Get top nodes based on success rate + /// Adds randomness pub async fn get_top_nodes_by_recent_success( &self, network: &str, limit: i64, ) -> Result> { + // Randomized ordering: r = max of 3 Uniform(0,1) (biased toward 1). + // Rank by (base_score * r) so top nodes remain preferred but can shuffle. + // r is drawn once per row in the CTE and reused in ORDER BY. + // Increase RANDOM() terms in MAX(...) to strengthen the bias. let rows = sqlx::query!( r#" - SELECT - n.scheme, - n.host, - n.port - FROM monero_nodes n - LEFT JOIN ( + WITH scored AS ( SELECT - node_id, - SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count, - SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count - FROM ( - SELECT node_id, was_successful - FROM health_checks - ORDER BY timestamp DESC - LIMIT 1000 - ) recent_checks - GROUP BY node_id - ) stats ON n.id = stats.node_id - WHERE n.network = ? - ORDER BY - CASE - WHEN (COALESCE(stats.success_count, 0) + COALESCE(stats.failure_count, 0)) > 0 - THEN CAST(COALESCE(stats.success_count, 0) AS REAL) / CAST(COALESCE(stats.success_count, 0) + COALESCE(stats.failure_count, 0) AS REAL) - ELSE 0.0 - END DESC + n.scheme, + n.host, + n.port, + CASE + WHEN (COALESCE(stats.success_count, 0) + COALESCE(stats.failure_count, 0)) > 0 + THEN CAST(COALESCE(stats.success_count, 0) AS REAL) / CAST(COALESCE(stats.success_count, 0) + COALESCE(stats.failure_count, 0) AS REAL) + ELSE 0.0 + END as base_score, + MAX( + ABS(RANDOM()) / CAST(0x7fffffffffffffff AS REAL), + ABS(RANDOM()) / CAST(0x7fffffffffffffff AS REAL), + ABS(RANDOM()) / CAST(0x7fffffffffffffff AS REAL) + ) as r + FROM monero_nodes n + LEFT JOIN ( + SELECT + node_id, + SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count, + SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count + FROM ( + SELECT node_id, was_successful + FROM health_checks + ORDER BY timestamp DESC + LIMIT 1000 + ) recent_checks + GROUP BY node_id + ) stats ON n.id = stats.node_id + WHERE n.network = ? + ) + SELECT scheme, host, port + FROM scored + ORDER BY (base_score * r) DESC, r DESC LIMIT ? "#, network, diff --git a/monero-rpc-pool/src/proxy.rs b/monero-rpc-pool/src/proxy.rs index 2c411560..379acd97 100644 --- a/monero-rpc-pool/src/proxy.rs +++ b/monero-rpc-pool/src/proxy.rs @@ -21,17 +21,20 @@ use tracing::{error, info_span, Instrument}; use crate::AppState; /// wallet2.h has a default timeout of 3 minutes + 30 seconds. -/// We assume this is a reasonable timeout. We use half of that to allow us do a single retry. +/// We assume this is a reasonable timeout. We use half of that that. /// https://github.com/SNeedlewoods/seraphis_wallet/blob/5f714f147fd29228698070e6bd80e41ce2f86fb0/src/wallet/wallet2.h#L238 static TIMEOUT: Duration = Duration::from_secs(3 * 60 + 30).checked_div(2).unwrap(); +/// If the main node does not finish within this period, we start a hedged request. +static SOFT_TIMEOUT: Duration = TIMEOUT.checked_div(2).unwrap(); + /// Trait alias for a stream that can be used with hyper trait HyperStream: AsyncRead + AsyncWrite + Unpin + Send {} impl HyperStream for T {} #[axum::debug_handler] pub async fn proxy_handler(State(state): State, request: Request) -> Response { - static POOL_SIZE: usize = 10; + static POOL_SIZE: usize = 20; // Get the pool of nodes let available_pool = state @@ -88,6 +91,38 @@ async fn proxy_to_multiple_nodes( return Err(HandlerError::NoNodes); } + // Sort nodes to prioritize those with available connections + // Check if we're using Tor for this request + let use_tor = match &state.tor_client { + Some(tc) + if tc.bootstrap_status().ready_for_traffic() && !request.clearnet_whitelisted() => + { + true + } + _ => false, + }; + + // Create a vector of (node, has_connection) pairs + let mut nodes_with_availability = Vec::new(); + for node in nodes.iter() { + let key = (node.0.clone(), node.1.clone(), node.2, use_tor); + let has_connection = state.connection_pool.has_available_connection(&key).await; + nodes_with_availability.push((node.clone(), has_connection)); + } + + // Sort: nodes with available connections come first + nodes_with_availability.sort_by(|a, b| { + // If a has connection and b doesn't, a comes first + // If both have or both don't have, maintain original order + b.1.cmp(&a.1) + }); + + // Extract just the sorted nodes + let nodes: Vec<(String, String, u16)> = nodes_with_availability + .into_iter() + .map(|(node, _)| node) + .collect(); + let mut collected_errors: Vec<((String, String, u16), HandlerError)> = Vec::new(); fn push_error( @@ -104,86 +139,128 @@ async fn proxy_to_multiple_nodes( // Success is defined as either: // - a raw HTTP response with a 200 response code // - a JSON-RPC response with status code 200 and no error field - for node in nodes { - // Node attempt logging without creating spans to reduce overhead + for pair in nodes.chunks(2) { + let node = pair[0].clone(); + let next = pair.get(1).cloned(); + let node_uri = display_node(&node); // Start timing the request let latency = std::time::Instant::now(); - let response = match proxy_to_single_node(state, request.clone(), &node) - .instrument(info_span!( - "connection", - node = node_uri, - tor = state.tor_client.is_some(), - )) - .await - { - Ok(response) => response, - Err(e) => { - push_error(&mut collected_errors, node, HandlerError::PhyiscalError(e)); - continue; + let mut winner = node.clone(); + let response = if let Some(hedge_node) = next.as_ref() { + let hedge_node_uri = display_node(hedge_node); + + // Use hedged proxy: race node vs next + match proxy_to_node_with_hedge(state, request.clone(), &node, hedge_node) + .instrument(info_span!( + "connection", + node = node_uri, + hedge_node = hedge_node_uri, + tor = state.tor_client.is_some(), + )) + .await + { + Ok((response, winner_node)) => { + // Completed this pair; move on to next pair in iterator + winner = winner_node.clone(); + response + } + Err(e) => { + // Pair failed (both or main failed and hedge unavailable). Record both nodes. + push_error( + &mut collected_errors, + node, + HandlerError::PhyiscalError(e.clone()), + ); + if let Some(hedge_node) = next.clone() { + push_error( + &mut collected_errors, + hedge_node, + HandlerError::PhyiscalError(e), + ); + } + continue; + } + } + } else { + // No hedge available; single node + match proxy_to_single_node(state, request.clone(), &node) + .instrument(info_span!( + "connection", + node = node_uri, + tor = state.tor_client.is_some(), + )) + .await + { + Ok(response) => response, + Err(e) => { + push_error(&mut collected_errors, node, HandlerError::PhyiscalError(e)); + continue; + } } }; // Calculate the latency let latency = latency.elapsed().as_millis() as f64; - // Convert response to streamable to check first 1KB for errors - let streamable_response = StreamableResponse::from_response_with_tracking( - response, - Some(state.node_pool.clone()), - ) - .await - .map_err(|e| { - HandlerError::CloneRequestError(format!("Failed to buffer response: {}", e)) - })?; + // Fully buffer the response before forwarding it to the caller + let buffered_response = CloneableResponse::from_response(response) + .await + .map_err(|e| { + HandlerError::CloneRequestError(format!("Failed to buffer response: {}", e)) + })?; - let error = match streamable_response.get_jsonrpc_error() { + // Record total bytes for bandwidth statistics + state + .node_pool + .record_bandwidth(buffered_response.body.len() as u64); + + let error = match buffered_response.get_jsonrpc_error() { Some(error) => { - // Check if we have already got two previous JSON-RPC errors - // If we did, we assume there is a reason for it - // We return the response as is (streaming). + // Check if we have already got two previous JSON-RPC errors. + // If we did, we assume there is a reason for it and return the response anyway. if collected_errors .iter() .filter(|(_, error)| matches!(error, HandlerError::JsonRpcError(_))) .count() >= 2 { - return Ok(streamable_response.into_response()); + return Ok(buffered_response.to_response()); } Some(HandlerError::JsonRpcError(error)) } - None if streamable_response.status().is_client_error() - || streamable_response.status().is_server_error() => + None if buffered_response.status().is_client_error() + || buffered_response.status().is_server_error() => { - Some(HandlerError::HttpError(streamable_response.status())) + Some(HandlerError::HttpError(buffered_response.status())) } _ => None, }; match error { Some(error) => { - push_error(&mut collected_errors, node, error); + push_error(&mut collected_errors, winner, error); } None => { tracing::trace!( - "Proxy request to {} succeeded, streaming response", - node_uri + "Proxy request to {} succeeded, returning buffered response", + display_node(&winner) ); // Only record errors if we have gotten a successful response - // This helps prevent logging errors if its our likely our fault (no internet) - for (node, _) in collected_errors.iter() { - record_failure(&state, &node.0, &node.1, node.2).await; + // This helps prevent logging errors if it's likely our fault (e.g. no internet). + for (node_failed, _) in collected_errors.iter() { + record_failure(&state, &node_failed.0, &node_failed.1, node_failed.2).await; } // Record the success with actual latency - record_success(&state, &node.0, &node.1, node.2, latency).await; + record_success(&state, &winner.0, &winner.1, winner.2, latency).await; - // Finally return the successful streaming response - return Ok(streamable_response.into_response()); + // Return the buffered response (no streaming) + return Ok(buffered_response.into_response()); } } } @@ -217,6 +294,94 @@ async fn maybe_wrap_with_tls( } } +/// Proxies a singular axum::Request to a given given main node with a specified hegde node +/// If the main nodes response hasn't finished after SOFT_TIMEOUT, we proxy to the hedge node +/// We then race the two responses, and return the one that finishes first (and is not an error) +async fn proxy_to_node_with_hedge( + state: &crate::AppState, + request: CloneableRequest, + main_node: &(String, String, u16), + hedge_node: &(String, String, u16), +) -> Result<(Response, (String, String, u16)), SingleRequestError> { + use std::future::Future; + + // Start the main request immediately + let mut main_fut = Box::pin(proxy_to_single_node(state, request.clone(), main_node)); + + // Hedge request will be started after the soft timeout, unless the main fails first + let mut hedge_fut: Option< + Pin> + Send>>, + > = None; + + // Timer to trigger the hedge request + let mut soft_timer = Box::pin(tokio::time::sleep(SOFT_TIMEOUT)); + let mut soft_timer_armed = true; + + // If the main fails, keep its error to return if hedge also fails + let mut main_error: Option = None; + + loop { + // A future that awaits the hedge if present; otherwise stays pending + let mut hedge_wait = futures::future::poll_fn(|cx| { + if let Some(f) = hedge_fut.as_mut() { + f.as_mut().poll(cx) + } else { + std::task::Poll::Pending + } + }); + + tokio::select! { + res = &mut main_fut => { + match res { + Ok(resp) => return Ok((resp, main_node.clone())), + Err(err) => { + // Start hedge immediately if not yet started + main_error = Some(err); + if hedge_fut.is_none() { + tracing::debug!("Starting hedge request"); + hedge_fut = Some(Box::pin(proxy_to_single_node(state, request.clone(), hedge_node))); + } + + // If hedge exists, await it and prefer its result + if let Some(hf) = &mut hedge_fut { + let hedge_res = hf.await; + return hedge_res + .map(|resp| (resp, hedge_node.clone())) + .or_else(|_| Err(main_error.take().unwrap())); + } else { + return Err(main_error.take().unwrap()); + } + } + } + } + + // Start hedge after soft timeout if not already started + _ = &mut soft_timer, if soft_timer_armed => { + // Disarm timer so it does not keep firing + soft_timer_armed = false; + if hedge_fut.is_none() { + tracing::debug!("Starting hedge request"); + hedge_fut = Some(Box::pin(proxy_to_single_node(state, request.clone(), hedge_node))); + } + } + + // If hedge is started, also race it + res = &mut hedge_wait => { + match res { + Ok(resp) => return Ok((resp, hedge_node.clone())), + Err(_hedge_err) => { + // Hedge failed; if main already failed, return main's error, otherwise keep waiting on main + if let Some(err) = main_error.take() { + return Err(err); + } + hedge_fut = None; + } + } + } + } + } +} + /// Proxies a singular axum::Request to a single node. /// Errors if we get a physical connection error /// @@ -314,12 +479,23 @@ async fn proxy_to_single_node( }; // Convert hyper Response to axum Response + // Buffer the entire response to avoid "end of file before message length reached" errors let (parts, body) = response.into_parts(); - let stream = body - .into_data_stream() - .map(|result| result.map_err(|e| axum::Error::new(e))); - let axum_body = Body::from_stream(stream); + // Collect the entire body into memory + let body_bytes = match body.collect().await { + Ok(collected) => collected.to_bytes().to_vec(), + Err(e) => { + // If we fail to read the full body, mark connection as failed + guarded_sender.mark_failed().await; + return Err(SingleRequestError::SendRequestError(format!( + "Failed to read response body: {}", + e + ))); + } + }; + + let axum_body = Body::from(body_bytes); Ok(Response::from_parts(parts, axum_body)) } @@ -332,7 +508,7 @@ fn get_jsonrpc_error(body: &[u8]) -> Option { .and_then(|e| e.as_str().map(|s| s.to_string())); } - // If we can't parse JSON, treat it as an error + // If we can't parse JSON, don't treat it as an error None } diff --git a/monero-rpc-pool/src/types.rs b/monero-rpc-pool/src/types.rs index b3b8c947..c93e853e 100644 --- a/monero-rpc-pool/src/types.rs +++ b/monero-rpc-pool/src/types.rs @@ -67,24 +67,6 @@ impl NodeHealthStats { self.success_count as f64 / total as f64 } } - - pub fn reliability_score(&self) -> f64 { - let success_rate = self.success_rate(); - let total_requests = self.success_count + self.failure_count; - - // Weight success rate by total requests (more requests = more reliable data) - let request_weight = (total_requests as f64).min(200.0) / 200.0; - let mut score = success_rate * request_weight; - - // Factor in latency - lower latency = higher score - if let Some(avg_latency) = self.avg_latency_ms { - // Normalize latency to 0-1 range (assuming 0-2000ms range) - let latency_factor = 1.0 - (avg_latency.min(2000.0) / 2000.0); - score = score * 0.8 + latency_factor * 0.2; // 80% success rate, 20% latency - } - - score - } } /// A complete node record combining address, metadata, and health stats @@ -114,8 +96,4 @@ impl NodeRecord { pub fn success_rate(&self) -> f64 { self.health.success_rate() } - - pub fn reliability_score(&self) -> f64 { - self.health.reliability_score() - } } diff --git a/swap/src/cli/api.rs b/swap/src/cli/api.rs index c80eb887..ac37e2f1 100644 --- a/swap/src/cli/api.rs +++ b/swap/src/cli/api.rs @@ -860,7 +860,9 @@ async fn open_monero_wallet( // None means the user rejected the password request // We prompt him to select a wallet again None => { - seed_choice = request_seed_choice(tauri_handle.clone().unwrap(), database).await?; + seed_choice = + request_seed_choice(tauri_handle.clone().unwrap(), database) + .await?; continue; } }; diff --git a/swap/src/common/tracing_util.rs b/swap/src/common/tracing_util.rs index d41b999d..eff7750f 100644 --- a/swap/src/common/tracing_util.rs +++ b/swap/src/common/tracing_util.rs @@ -76,9 +76,10 @@ pub fn init( "swap_env", "swap_fs", "swap_serde", + "monero_rpc_pool", ]; - let INFO_LEVEL_CRATES: Vec<&str> = vec!["monero_rpc_pool"]; + let INFO_LEVEL_CRATES: Vec<&str> = vec![]; // General log file for non-verbose logs let file_appender: RollingFileAppender = tracing_appender::rolling::never(&dir, "swap-all.log");