feat(monero-rpc-pool): Add randomness to node selection, prefer established TCP circuits (#508)

* feat(monero-rpc-pool): Add randomness to node selection, prefer established TCP circuits

* amend changelog

* fix
This commit is contained in:
Mohan 2025-08-10 17:46:39 +02:00 committed by GitHub
parent 9ccbb1816c
commit 3163ca7cd3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 310 additions and 124 deletions

View file

@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- GUI: Speedup startup by concurrently bootstrapping Tor and requesting the user to select a wallet
- GUI: Add white background to QR code modal to make it better scannable
- GUI + CLI + ASB: Add `/dns4/rendezvous.observer/tcp/8888/p2p/12D3KooWMjceGXrYuGuDMGrfmJxALnSDbK4km6s1i1sJEgDTgGQa` to the default list of rendezvous points
- GUI + CLI + ASB: Monero RPC pool now prioritizes nodes with pre-established TCP connections
## [3.0.0-beta.6] - 2025-08-07

View file

@ -1,28 +0,0 @@
{
"db_name": "SQLite",
"query": "\n SELECT \n n.scheme,\n n.host,\n n.port\n FROM monero_nodes n\n LEFT JOIN (\n SELECT \n node_id,\n SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,\n SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count\n FROM (\n SELECT node_id, was_successful\n FROM health_checks \n ORDER BY timestamp DESC \n LIMIT 1000\n ) recent_checks\n GROUP BY node_id\n ) stats ON n.id = stats.node_id\n WHERE n.network = ?\n ORDER BY \n CASE \n WHEN (COALESCE(stats.success_count, 0) + COALESCE(stats.failure_count, 0)) > 0 \n THEN CAST(COALESCE(stats.success_count, 0) AS REAL) / CAST(COALESCE(stats.success_count, 0) + COALESCE(stats.failure_count, 0) AS REAL)\n ELSE 0.0 \n END DESC\n LIMIT ?\n ",
"describe": {
"columns": [
{
"name": "scheme",
"ordinal": 0,
"type_info": "Text"
},
{
"name": "host",
"ordinal": 1,
"type_info": "Text"
},
{
"name": "port",
"ordinal": 2,
"type_info": "Integer"
}
],
"parameters": {
"Right": 2
},
"nullable": [false, false, false]
},
"hash": "44ddff5bdf5b56e9c1a9848641181de4441c8974b2d1304804874cf620420ad4"
}

View file

@ -0,0 +1,28 @@
{
"db_name": "SQLite",
"query": "\n WITH scored AS (\n SELECT \n n.scheme,\n n.host,\n n.port,\n CASE \n WHEN (COALESCE(stats.success_count, 0) + COALESCE(stats.failure_count, 0)) > 0 \n THEN CAST(COALESCE(stats.success_count, 0) AS REAL) / CAST(COALESCE(stats.success_count, 0) + COALESCE(stats.failure_count, 0) AS REAL)\n ELSE 0.0 \n END as base_score,\n MAX(\n ABS(RANDOM()) / CAST(0x7fffffffffffffff AS REAL),\n ABS(RANDOM()) / CAST(0x7fffffffffffffff AS REAL),\n ABS(RANDOM()) / CAST(0x7fffffffffffffff AS REAL)\n ) as r\n FROM monero_nodes n\n LEFT JOIN (\n SELECT \n node_id,\n SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,\n SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count\n FROM (\n SELECT node_id, was_successful\n FROM health_checks \n ORDER BY timestamp DESC \n LIMIT 1000\n ) recent_checks\n GROUP BY node_id\n ) stats ON n.id = stats.node_id\n WHERE n.network = ?\n )\n SELECT scheme, host, port\n FROM scored\n ORDER BY (base_score * r) DESC, r DESC\n LIMIT ?\n ",
"describe": {
"columns": [
{
"name": "scheme",
"ordinal": 0,
"type_info": "Text"
},
{
"name": "host",
"ordinal": 1,
"type_info": "Text"
},
{
"name": "port",
"ordinal": 2,
"type_info": "Integer"
}
],
"parameters": {
"Right": 2
},
"nullable": [false, false, false]
},
"hash": "4ce7c42906ba69e0c8e1c0dad952956edd582a0edecd45710e22dcb28785eeab"
}

View file

@ -143,7 +143,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Arc::new(
reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(3 * 60 + 30)) // used in wallet2
.timeout(std::time::Duration::from_secs(10 * 60 + 30)) // used in wallet2
.build()
.expect("Failed to build reqwest client"),
);

View file

@ -175,4 +175,18 @@ impl ConnectionPool {
}
}
}
/// Check if there's an available (unlocked) connection for the given key.
pub async fn has_available_connection(&self, key: &StreamKey) -> bool {
let map = self.inner.read().await;
if let Some(vec_lock) = map.get(key) {
let vec = vec_lock.read().await;
for sender_mutex in vec.iter() {
if sender_mutex.try_lock().is_ok() {
return true;
}
}
}
false
}
}

View file

@ -232,17 +232,33 @@ impl Database {
}
/// Get top nodes based on success rate
/// Adds randomness
pub async fn get_top_nodes_by_recent_success(
&self,
network: &str,
limit: i64,
) -> Result<Vec<NodeAddress>> {
// Randomized ordering: r = max of 3 Uniform(0,1) (biased toward 1).
// Rank by (base_score * r) so top nodes remain preferred but can shuffle.
// r is drawn once per row in the CTE and reused in ORDER BY.
// Increase RANDOM() terms in MAX(...) to strengthen the bias.
let rows = sqlx::query!(
r#"
WITH scored AS (
SELECT
n.scheme,
n.host,
n.port
n.port,
CASE
WHEN (COALESCE(stats.success_count, 0) + COALESCE(stats.failure_count, 0)) > 0
THEN CAST(COALESCE(stats.success_count, 0) AS REAL) / CAST(COALESCE(stats.success_count, 0) + COALESCE(stats.failure_count, 0) AS REAL)
ELSE 0.0
END as base_score,
MAX(
ABS(RANDOM()) / CAST(0x7fffffffffffffff AS REAL),
ABS(RANDOM()) / CAST(0x7fffffffffffffff AS REAL),
ABS(RANDOM()) / CAST(0x7fffffffffffffff AS REAL)
) as r
FROM monero_nodes n
LEFT JOIN (
SELECT
@ -258,12 +274,10 @@ impl Database {
GROUP BY node_id
) stats ON n.id = stats.node_id
WHERE n.network = ?
ORDER BY
CASE
WHEN (COALESCE(stats.success_count, 0) + COALESCE(stats.failure_count, 0)) > 0
THEN CAST(COALESCE(stats.success_count, 0) AS REAL) / CAST(COALESCE(stats.success_count, 0) + COALESCE(stats.failure_count, 0) AS REAL)
ELSE 0.0
END DESC
)
SELECT scheme, host, port
FROM scored
ORDER BY (base_score * r) DESC, r DESC
LIMIT ?
"#,
network,

View file

@ -21,17 +21,20 @@ use tracing::{error, info_span, Instrument};
use crate::AppState;
/// wallet2.h has a default timeout of 3 minutes + 30 seconds.
/// We assume this is a reasonable timeout. We use half of that to allow us do a single retry.
/// We assume this is a reasonable timeout. We use half of that that.
/// https://github.com/SNeedlewoods/seraphis_wallet/blob/5f714f147fd29228698070e6bd80e41ce2f86fb0/src/wallet/wallet2.h#L238
static TIMEOUT: Duration = Duration::from_secs(3 * 60 + 30).checked_div(2).unwrap();
/// If the main node does not finish within this period, we start a hedged request.
static SOFT_TIMEOUT: Duration = TIMEOUT.checked_div(2).unwrap();
/// Trait alias for a stream that can be used with hyper
trait HyperStream: AsyncRead + AsyncWrite + Unpin + Send {}
impl<T: AsyncRead + AsyncWrite + Unpin + Send> HyperStream for T {}
#[axum::debug_handler]
pub async fn proxy_handler(State(state): State<AppState>, request: Request) -> Response {
static POOL_SIZE: usize = 10;
static POOL_SIZE: usize = 20;
// Get the pool of nodes
let available_pool = state
@ -88,6 +91,38 @@ async fn proxy_to_multiple_nodes(
return Err(HandlerError::NoNodes);
}
// Sort nodes to prioritize those with available connections
// Check if we're using Tor for this request
let use_tor = match &state.tor_client {
Some(tc)
if tc.bootstrap_status().ready_for_traffic() && !request.clearnet_whitelisted() =>
{
true
}
_ => false,
};
// Create a vector of (node, has_connection) pairs
let mut nodes_with_availability = Vec::new();
for node in nodes.iter() {
let key = (node.0.clone(), node.1.clone(), node.2, use_tor);
let has_connection = state.connection_pool.has_available_connection(&key).await;
nodes_with_availability.push((node.clone(), has_connection));
}
// Sort: nodes with available connections come first
nodes_with_availability.sort_by(|a, b| {
// If a has connection and b doesn't, a comes first
// If both have or both don't have, maintain original order
b.1.cmp(&a.1)
});
// Extract just the sorted nodes
let nodes: Vec<(String, String, u16)> = nodes_with_availability
.into_iter()
.map(|(node, _)| node)
.collect();
let mut collected_errors: Vec<((String, String, u16), HandlerError)> = Vec::new();
fn push_error(
@ -104,14 +139,54 @@ async fn proxy_to_multiple_nodes(
// Success is defined as either:
// - a raw HTTP response with a 200 response code
// - a JSON-RPC response with status code 200 and no error field
for node in nodes {
// Node attempt logging without creating spans to reduce overhead
for pair in nodes.chunks(2) {
let node = pair[0].clone();
let next = pair.get(1).cloned();
let node_uri = display_node(&node);
// Start timing the request
let latency = std::time::Instant::now();
let response = match proxy_to_single_node(state, request.clone(), &node)
let mut winner = node.clone();
let response = if let Some(hedge_node) = next.as_ref() {
let hedge_node_uri = display_node(hedge_node);
// Use hedged proxy: race node vs next
match proxy_to_node_with_hedge(state, request.clone(), &node, hedge_node)
.instrument(info_span!(
"connection",
node = node_uri,
hedge_node = hedge_node_uri,
tor = state.tor_client.is_some(),
))
.await
{
Ok((response, winner_node)) => {
// Completed this pair; move on to next pair in iterator
winner = winner_node.clone();
response
}
Err(e) => {
// Pair failed (both or main failed and hedge unavailable). Record both nodes.
push_error(
&mut collected_errors,
node,
HandlerError::PhyiscalError(e.clone()),
);
if let Some(hedge_node) = next.clone() {
push_error(
&mut collected_errors,
hedge_node,
HandlerError::PhyiscalError(e),
);
}
continue;
}
}
} else {
// No hedge available; single node
match proxy_to_single_node(state, request.clone(), &node)
.instrument(info_span!(
"connection",
node = node_uri,
@ -124,66 +199,68 @@ async fn proxy_to_multiple_nodes(
push_error(&mut collected_errors, node, HandlerError::PhyiscalError(e));
continue;
}
}
};
// Calculate the latency
let latency = latency.elapsed().as_millis() as f64;
// Convert response to streamable to check first 1KB for errors
let streamable_response = StreamableResponse::from_response_with_tracking(
response,
Some(state.node_pool.clone()),
)
// Fully buffer the response before forwarding it to the caller
let buffered_response = CloneableResponse::from_response(response)
.await
.map_err(|e| {
HandlerError::CloneRequestError(format!("Failed to buffer response: {}", e))
})?;
let error = match streamable_response.get_jsonrpc_error() {
// Record total bytes for bandwidth statistics
state
.node_pool
.record_bandwidth(buffered_response.body.len() as u64);
let error = match buffered_response.get_jsonrpc_error() {
Some(error) => {
// Check if we have already got two previous JSON-RPC errors
// If we did, we assume there is a reason for it
// We return the response as is (streaming).
// Check if we have already got two previous JSON-RPC errors.
// If we did, we assume there is a reason for it and return the response anyway.
if collected_errors
.iter()
.filter(|(_, error)| matches!(error, HandlerError::JsonRpcError(_)))
.count()
>= 2
{
return Ok(streamable_response.into_response());
return Ok(buffered_response.to_response());
}
Some(HandlerError::JsonRpcError(error))
}
None if streamable_response.status().is_client_error()
|| streamable_response.status().is_server_error() =>
None if buffered_response.status().is_client_error()
|| buffered_response.status().is_server_error() =>
{
Some(HandlerError::HttpError(streamable_response.status()))
Some(HandlerError::HttpError(buffered_response.status()))
}
_ => None,
};
match error {
Some(error) => {
push_error(&mut collected_errors, node, error);
push_error(&mut collected_errors, winner, error);
}
None => {
tracing::trace!(
"Proxy request to {} succeeded, streaming response",
node_uri
"Proxy request to {} succeeded, returning buffered response",
display_node(&winner)
);
// Only record errors if we have gotten a successful response
// This helps prevent logging errors if its our likely our fault (no internet)
for (node, _) in collected_errors.iter() {
record_failure(&state, &node.0, &node.1, node.2).await;
// This helps prevent logging errors if it's likely our fault (e.g. no internet).
for (node_failed, _) in collected_errors.iter() {
record_failure(&state, &node_failed.0, &node_failed.1, node_failed.2).await;
}
// Record the success with actual latency
record_success(&state, &node.0, &node.1, node.2, latency).await;
record_success(&state, &winner.0, &winner.1, winner.2, latency).await;
// Finally return the successful streaming response
return Ok(streamable_response.into_response());
// Return the buffered response (no streaming)
return Ok(buffered_response.into_response());
}
}
}
@ -217,6 +294,94 @@ async fn maybe_wrap_with_tls(
}
}
/// Proxies a singular axum::Request to a given given main node with a specified hegde node
/// If the main nodes response hasn't finished after SOFT_TIMEOUT, we proxy to the hedge node
/// We then race the two responses, and return the one that finishes first (and is not an error)
async fn proxy_to_node_with_hedge(
state: &crate::AppState,
request: CloneableRequest,
main_node: &(String, String, u16),
hedge_node: &(String, String, u16),
) -> Result<(Response, (String, String, u16)), SingleRequestError> {
use std::future::Future;
// Start the main request immediately
let mut main_fut = Box::pin(proxy_to_single_node(state, request.clone(), main_node));
// Hedge request will be started after the soft timeout, unless the main fails first
let mut hedge_fut: Option<
Pin<Box<dyn Future<Output = Result<Response, SingleRequestError>> + Send>>,
> = None;
// Timer to trigger the hedge request
let mut soft_timer = Box::pin(tokio::time::sleep(SOFT_TIMEOUT));
let mut soft_timer_armed = true;
// If the main fails, keep its error to return if hedge also fails
let mut main_error: Option<SingleRequestError> = None;
loop {
// A future that awaits the hedge if present; otherwise stays pending
let mut hedge_wait = futures::future::poll_fn(|cx| {
if let Some(f) = hedge_fut.as_mut() {
f.as_mut().poll(cx)
} else {
std::task::Poll::Pending
}
});
tokio::select! {
res = &mut main_fut => {
match res {
Ok(resp) => return Ok((resp, main_node.clone())),
Err(err) => {
// Start hedge immediately if not yet started
main_error = Some(err);
if hedge_fut.is_none() {
tracing::debug!("Starting hedge request");
hedge_fut = Some(Box::pin(proxy_to_single_node(state, request.clone(), hedge_node)));
}
// If hedge exists, await it and prefer its result
if let Some(hf) = &mut hedge_fut {
let hedge_res = hf.await;
return hedge_res
.map(|resp| (resp, hedge_node.clone()))
.or_else(|_| Err(main_error.take().unwrap()));
} else {
return Err(main_error.take().unwrap());
}
}
}
}
// Start hedge after soft timeout if not already started
_ = &mut soft_timer, if soft_timer_armed => {
// Disarm timer so it does not keep firing
soft_timer_armed = false;
if hedge_fut.is_none() {
tracing::debug!("Starting hedge request");
hedge_fut = Some(Box::pin(proxy_to_single_node(state, request.clone(), hedge_node)));
}
}
// If hedge is started, also race it
res = &mut hedge_wait => {
match res {
Ok(resp) => return Ok((resp, hedge_node.clone())),
Err(_hedge_err) => {
// Hedge failed; if main already failed, return main's error, otherwise keep waiting on main
if let Some(err) = main_error.take() {
return Err(err);
}
hedge_fut = None;
}
}
}
}
}
}
/// Proxies a singular axum::Request to a single node.
/// Errors if we get a physical connection error
///
@ -314,12 +479,23 @@ async fn proxy_to_single_node(
};
// Convert hyper Response<Incoming> to axum Response<Body>
// Buffer the entire response to avoid "end of file before message length reached" errors
let (parts, body) = response.into_parts();
let stream = body
.into_data_stream()
.map(|result| result.map_err(|e| axum::Error::new(e)));
let axum_body = Body::from_stream(stream);
// Collect the entire body into memory
let body_bytes = match body.collect().await {
Ok(collected) => collected.to_bytes().to_vec(),
Err(e) => {
// If we fail to read the full body, mark connection as failed
guarded_sender.mark_failed().await;
return Err(SingleRequestError::SendRequestError(format!(
"Failed to read response body: {}",
e
)));
}
};
let axum_body = Body::from(body_bytes);
Ok(Response::from_parts(parts, axum_body))
}
@ -332,7 +508,7 @@ fn get_jsonrpc_error(body: &[u8]) -> Option<String> {
.and_then(|e| e.as_str().map(|s| s.to_string()));
}
// If we can't parse JSON, treat it as an error
// If we can't parse JSON, don't treat it as an error
None
}

View file

@ -67,24 +67,6 @@ impl NodeHealthStats {
self.success_count as f64 / total as f64
}
}
pub fn reliability_score(&self) -> f64 {
let success_rate = self.success_rate();
let total_requests = self.success_count + self.failure_count;
// Weight success rate by total requests (more requests = more reliable data)
let request_weight = (total_requests as f64).min(200.0) / 200.0;
let mut score = success_rate * request_weight;
// Factor in latency - lower latency = higher score
if let Some(avg_latency) = self.avg_latency_ms {
// Normalize latency to 0-1 range (assuming 0-2000ms range)
let latency_factor = 1.0 - (avg_latency.min(2000.0) / 2000.0);
score = score * 0.8 + latency_factor * 0.2; // 80% success rate, 20% latency
}
score
}
}
/// A complete node record combining address, metadata, and health stats
@ -114,8 +96,4 @@ impl NodeRecord {
pub fn success_rate(&self) -> f64 {
self.health.success_rate()
}
pub fn reliability_score(&self) -> f64 {
self.health.reliability_score()
}
}

View file

@ -860,7 +860,9 @@ async fn open_monero_wallet(
// None means the user rejected the password request
// We prompt him to select a wallet again
None => {
seed_choice = request_seed_choice(tauri_handle.clone().unwrap(), database).await?;
seed_choice =
request_seed_choice(tauri_handle.clone().unwrap(), database)
.await?;
continue;
}
};

View file

@ -76,9 +76,10 @@ pub fn init(
"swap_env",
"swap_fs",
"swap_serde",
"monero_rpc_pool",
];
let INFO_LEVEL_CRATES: Vec<&str> = vec!["monero_rpc_pool"];
let INFO_LEVEL_CRATES: Vec<&str> = vec![];
// General log file for non-verbose logs
let file_appender: RollingFileAppender = tracing_appender::rolling::never(&dir, "swap-all.log");