refactor(monero-rpc-pool): Type safe SQL, simplify discovery (#443)

* refactor(monero-rpc-pool): Type safe SQL, default nodes, no monero.fail

* refactor

* refactor

* refactoring

* fmt

* add some randomness to node selection
This commit is contained in:
Mohan 2025-06-30 10:53:01 +02:00 committed by GitHub
parent ef3c9139d1
commit cc4069ebad
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 467 additions and 2323 deletions

1
Cargo.lock generated
View file

@ -5974,7 +5974,6 @@ dependencies = [
"axum", "axum",
"chrono", "chrono",
"clap 4.5.40", "clap 4.5.40",
"dirs 5.0.1",
"futures", "futures",
"monero", "monero",
"monero-rpc", "monero-rpc",

View file

@ -1,18 +0,0 @@
{
"db_name": "SQLite",
"query": "SELECT id FROM monero_nodes WHERE scheme = ? AND host = ? AND port = ?",
"describe": {
"columns": [
{
"name": "id",
"ordinal": 0,
"type_info": "Integer"
}
],
"parameters": {
"Right": 3
},
"nullable": [true]
},
"hash": "03e5b2bccf8bffb962a56443448311800bb832efe37fe6c52181bd7bf631740c"
}

View file

@ -1,110 +0,0 @@
{
"db_name": "SQLite",
"query": "\n SELECT \n n.id as \"id!: i64\",\n n.scheme,\n n.host,\n n.port,\n n.network,\n n.first_seen_at,\n CAST(COALESCE(stats.success_count, 0) AS INTEGER) as \"success_count!: i64\",\n CAST(COALESCE(stats.failure_count, 0) AS INTEGER) as \"failure_count!: i64\",\n stats.last_success as \"last_success?: String\",\n stats.last_failure as \"last_failure?: String\",\n stats.last_checked as \"last_checked?: String\",\n CAST(CASE WHEN reliable_nodes.node_id IS NOT NULL THEN 1 ELSE 0 END AS INTEGER) as \"is_reliable!: i64\",\n stats.avg_latency_ms as \"avg_latency_ms?: f64\",\n stats.min_latency_ms as \"min_latency_ms?: f64\",\n stats.max_latency_ms as \"max_latency_ms?: f64\",\n stats.last_latency_ms as \"last_latency_ms?: f64\"\n FROM monero_nodes n\n LEFT JOIN (\n SELECT \n node_id,\n SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,\n SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count,\n MAX(CASE WHEN was_successful THEN timestamp END) as last_success,\n MAX(CASE WHEN NOT was_successful THEN timestamp END) as last_failure,\n MAX(timestamp) as last_checked,\n AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms,\n MIN(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as min_latency_ms,\n MAX(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as max_latency_ms,\n (SELECT latency_ms FROM health_checks hc2 WHERE hc2.node_id = health_checks.node_id ORDER BY timestamp DESC LIMIT 1) as last_latency_ms\n FROM health_checks \n GROUP BY node_id\n ) stats ON n.id = stats.node_id\n LEFT JOIN (\n SELECT DISTINCT node_id FROM (\n SELECT \n n2.id as node_id,\n COALESCE(s2.success_count, 0) as success_count,\n COALESCE(s2.failure_count, 0) as failure_count,\n s2.avg_latency_ms,\n (CAST(COALESCE(s2.success_count, 0) AS REAL) / CAST(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0) AS REAL)) * \n (MIN(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0), 200) / 200.0) * 0.8 +\n CASE \n WHEN s2.avg_latency_ms IS NOT NULL THEN (1.0 - (MIN(s2.avg_latency_ms, 2000) / 2000.0)) * 0.2\n ELSE 0.0 \n END as reliability_score\n FROM monero_nodes n2\n LEFT JOIN (\n SELECT \n node_id,\n SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,\n SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count,\n AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms\n FROM health_checks \n GROUP BY node_id\n ) s2 ON n2.id = s2.node_id\n WHERE n2.network = ? AND (COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0)) > 0\n ORDER BY reliability_score DESC\n LIMIT 4\n )\n ) reliable_nodes ON n.id = reliable_nodes.node_id\n WHERE n.network = ? AND (COALESCE(stats.success_count, 0) + COALESCE(stats.failure_count, 0)) > 0\n ORDER BY \n (CAST(COALESCE(stats.success_count, 0) AS REAL) / CAST(COALESCE(stats.success_count, 0) + COALESCE(stats.failure_count, 0) AS REAL)) DESC,\n stats.avg_latency_ms ASC\n LIMIT ?\n ",
"describe": {
"columns": [
{
"name": "id!: i64",
"ordinal": 0,
"type_info": "Integer"
},
{
"name": "scheme",
"ordinal": 1,
"type_info": "Text"
},
{
"name": "host",
"ordinal": 2,
"type_info": "Text"
},
{
"name": "port",
"ordinal": 3,
"type_info": "Integer"
},
{
"name": "network",
"ordinal": 4,
"type_info": "Text"
},
{
"name": "first_seen_at",
"ordinal": 5,
"type_info": "Text"
},
{
"name": "success_count!: i64",
"ordinal": 6,
"type_info": "Null"
},
{
"name": "failure_count!: i64",
"ordinal": 7,
"type_info": "Null"
},
{
"name": "last_success?: String",
"ordinal": 8,
"type_info": "Null"
},
{
"name": "last_failure?: String",
"ordinal": 9,
"type_info": "Null"
},
{
"name": "last_checked?: String",
"ordinal": 10,
"type_info": "Null"
},
{
"name": "is_reliable!: i64",
"ordinal": 11,
"type_info": "Null"
},
{
"name": "avg_latency_ms?: f64",
"ordinal": 12,
"type_info": "Null"
},
{
"name": "min_latency_ms?: f64",
"ordinal": 13,
"type_info": "Null"
},
{
"name": "max_latency_ms?: f64",
"ordinal": 14,
"type_info": "Null"
},
{
"name": "last_latency_ms?: f64",
"ordinal": 15,
"type_info": "Float"
}
],
"parameters": {
"Right": 3
},
"nullable": [
true,
false,
false,
false,
false,
false,
null,
null,
null,
null,
null,
null,
null,
null,
null,
true
]
},
"hash": "08d143b977a7fa23b289c22dee3cab4d64debeea9932c58047cc6244d136f80d"
}

View file

@ -1,110 +0,0 @@
{
"db_name": "SQLite",
"query": "\n SELECT \n n.id as \"id!: i64\",\n n.scheme,\n n.host,\n n.port,\n n.network,\n n.first_seen_at,\n CAST(COALESCE(stats.success_count, 0) AS INTEGER) as \"success_count!: i64\",\n CAST(COALESCE(stats.failure_count, 0) AS INTEGER) as \"failure_count!: i64\",\n stats.last_success as \"last_success?: String\",\n stats.last_failure as \"last_failure?: String\",\n stats.last_checked as \"last_checked?: String\",\n CAST(CASE WHEN reliable_nodes.node_id IS NOT NULL THEN 1 ELSE 0 END AS INTEGER) as \"is_reliable!: i64\",\n stats.avg_latency_ms as \"avg_latency_ms?: f64\",\n stats.min_latency_ms as \"min_latency_ms?: f64\",\n stats.max_latency_ms as \"max_latency_ms?: f64\",\n stats.last_latency_ms as \"last_latency_ms?: f64\"\n FROM monero_nodes n\n LEFT JOIN (\n SELECT \n node_id,\n SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,\n SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count,\n MAX(CASE WHEN was_successful THEN timestamp END) as last_success,\n MAX(CASE WHEN NOT was_successful THEN timestamp END) as last_failure,\n MAX(timestamp) as last_checked,\n AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms,\n MIN(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as min_latency_ms,\n MAX(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as max_latency_ms,\n (SELECT latency_ms FROM health_checks hc2 WHERE hc2.node_id = health_checks.node_id ORDER BY timestamp DESC LIMIT 1) as last_latency_ms\n FROM health_checks \n GROUP BY node_id\n ) stats ON n.id = stats.node_id\n LEFT JOIN (\n SELECT DISTINCT node_id FROM (\n SELECT \n n2.id as node_id,\n COALESCE(s2.success_count, 0) as success_count,\n COALESCE(s2.failure_count, 0) as failure_count,\n s2.avg_latency_ms,\n (CAST(COALESCE(s2.success_count, 0) AS REAL) / CAST(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0) AS REAL)) * \n (MIN(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0), 200) / 200.0) * 0.8 +\n CASE \n WHEN s2.avg_latency_ms IS NOT NULL THEN (1.0 - (MIN(s2.avg_latency_ms, 2000) / 2000.0)) * 0.2\n ELSE 0.0 \n END as reliability_score\n FROM monero_nodes n2\n LEFT JOIN (\n SELECT \n node_id,\n SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,\n SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count,\n AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms\n FROM health_checks \n GROUP BY node_id\n ) s2 ON n2.id = s2.node_id\n WHERE n2.network = ? AND (COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0)) > 0\n ORDER BY reliability_score DESC\n LIMIT 4\n )\n ) reliable_nodes ON n.id = reliable_nodes.node_id\n WHERE n.network = ?\n ORDER BY stats.avg_latency_ms ASC, stats.success_count DESC\n ",
"describe": {
"columns": [
{
"name": "id!: i64",
"ordinal": 0,
"type_info": "Integer"
},
{
"name": "scheme",
"ordinal": 1,
"type_info": "Text"
},
{
"name": "host",
"ordinal": 2,
"type_info": "Text"
},
{
"name": "port",
"ordinal": 3,
"type_info": "Integer"
},
{
"name": "network",
"ordinal": 4,
"type_info": "Text"
},
{
"name": "first_seen_at",
"ordinal": 5,
"type_info": "Text"
},
{
"name": "success_count!: i64",
"ordinal": 6,
"type_info": "Null"
},
{
"name": "failure_count!: i64",
"ordinal": 7,
"type_info": "Null"
},
{
"name": "last_success?: String",
"ordinal": 8,
"type_info": "Null"
},
{
"name": "last_failure?: String",
"ordinal": 9,
"type_info": "Null"
},
{
"name": "last_checked?: String",
"ordinal": 10,
"type_info": "Null"
},
{
"name": "is_reliable!: i64",
"ordinal": 11,
"type_info": "Null"
},
{
"name": "avg_latency_ms?: f64",
"ordinal": 12,
"type_info": "Null"
},
{
"name": "min_latency_ms?: f64",
"ordinal": 13,
"type_info": "Null"
},
{
"name": "max_latency_ms?: f64",
"ordinal": 14,
"type_info": "Null"
},
{
"name": "last_latency_ms?: f64",
"ordinal": 15,
"type_info": "Float"
}
],
"parameters": {
"Right": 2
},
"nullable": [
true,
false,
false,
false,
false,
false,
null,
null,
null,
null,
null,
null,
null,
null,
null,
true
]
},
"hash": "0aa34e769813a40e0518f5311ff95685fd5278103d13f56795ff5a51b0ef8036"
}

View file

@ -1,110 +0,0 @@
{
"db_name": "SQLite",
"query": "\n SELECT \n n.id as \"id!: i64\",\n n.scheme,\n n.host,\n n.port,\n n.network,\n n.first_seen_at,\n CAST(COALESCE(stats.success_count, 0) AS INTEGER) as \"success_count!: i64\",\n CAST(COALESCE(stats.failure_count, 0) AS INTEGER) as \"failure_count!: i64\",\n stats.last_success as \"last_success?: String\",\n stats.last_failure as \"last_failure?: String\",\n stats.last_checked as \"last_checked?: String\",\n CAST(CASE WHEN reliable_nodes.node_id IS NOT NULL THEN 1 ELSE 0 END AS INTEGER) as \"is_reliable!: i64\",\n stats.avg_latency_ms as \"avg_latency_ms?: f64\",\n stats.min_latency_ms as \"min_latency_ms?: f64\",\n stats.max_latency_ms as \"max_latency_ms?: f64\",\n stats.last_latency_ms as \"last_latency_ms?: f64\"\n FROM monero_nodes n\n LEFT JOIN (\n SELECT \n node_id,\n SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,\n SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count,\n MAX(CASE WHEN was_successful THEN timestamp END) as last_success,\n MAX(CASE WHEN NOT was_successful THEN timestamp END) as last_failure,\n MAX(timestamp) as last_checked,\n AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms,\n MIN(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as min_latency_ms,\n MAX(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as max_latency_ms,\n (SELECT latency_ms FROM health_checks hc2 WHERE hc2.node_id = health_checks.node_id ORDER BY timestamp DESC LIMIT 1) as last_latency_ms\n FROM health_checks \n GROUP BY node_id\n ) stats ON n.id = stats.node_id\n LEFT JOIN (\n SELECT DISTINCT node_id FROM (\n SELECT \n n2.id as node_id,\n COALESCE(s2.success_count, 0) as success_count,\n COALESCE(s2.failure_count, 0) as failure_count,\n s2.avg_latency_ms,\n (CAST(COALESCE(s2.success_count, 0) AS REAL) / CAST(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0) AS REAL)) * \n (MIN(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0), 200) / 200.0) * 0.8 +\n CASE \n WHEN s2.avg_latency_ms IS NOT NULL THEN (1.0 - (MIN(s2.avg_latency_ms, 2000) / 2000.0)) * 0.2\n ELSE 0.0 \n END as reliability_score\n FROM monero_nodes n2\n LEFT JOIN (\n SELECT \n node_id,\n SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,\n SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count,\n AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms\n FROM health_checks \n GROUP BY node_id\n ) s2 ON n2.id = s2.node_id\n WHERE n2.network = ? AND (COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0)) > 0\n ORDER BY reliability_score DESC\n LIMIT 4\n )\n ) reliable_nodes ON n.id = reliable_nodes.node_id\n WHERE n.network = ?\n ORDER BY RANDOM()\n LIMIT ?\n ",
"describe": {
"columns": [
{
"name": "id!: i64",
"ordinal": 0,
"type_info": "Integer"
},
{
"name": "scheme",
"ordinal": 1,
"type_info": "Text"
},
{
"name": "host",
"ordinal": 2,
"type_info": "Text"
},
{
"name": "port",
"ordinal": 3,
"type_info": "Integer"
},
{
"name": "network",
"ordinal": 4,
"type_info": "Text"
},
{
"name": "first_seen_at",
"ordinal": 5,
"type_info": "Text"
},
{
"name": "success_count!: i64",
"ordinal": 6,
"type_info": "Null"
},
{
"name": "failure_count!: i64",
"ordinal": 7,
"type_info": "Null"
},
{
"name": "last_success?: String",
"ordinal": 8,
"type_info": "Null"
},
{
"name": "last_failure?: String",
"ordinal": 9,
"type_info": "Null"
},
{
"name": "last_checked?: String",
"ordinal": 10,
"type_info": "Null"
},
{
"name": "is_reliable!: i64",
"ordinal": 11,
"type_info": "Null"
},
{
"name": "avg_latency_ms?: f64",
"ordinal": 12,
"type_info": "Null"
},
{
"name": "min_latency_ms?: f64",
"ordinal": 13,
"type_info": "Null"
},
{
"name": "max_latency_ms?: f64",
"ordinal": 14,
"type_info": "Null"
},
{
"name": "last_latency_ms?: f64",
"ordinal": 15,
"type_info": "Float"
}
],
"parameters": {
"Right": 3
},
"nullable": [
true,
false,
false,
false,
false,
false,
null,
null,
null,
null,
null,
null,
null,
null,
null,
true
]
},
"hash": "2a378cb109fe284ba3a939aed1bcb50dc694c89ef1eb08bf3d62e7d9e0902a4e"
}

View file

@ -1,43 +0,0 @@
{
"db_name": "SQLite",
"query": "\n SELECT \n id as \"id!: i64\",\n scheme,\n host,\n port,\n network as \"network!: String\",\n first_seen_at\n FROM monero_nodes \n ORDER BY id\n ",
"describe": {
"columns": [
{
"name": "id!: i64",
"ordinal": 0,
"type_info": "Integer"
},
{
"name": "scheme",
"ordinal": 1,
"type_info": "Text"
},
{
"name": "host",
"ordinal": 2,
"type_info": "Text"
},
{
"name": "port",
"ordinal": 3,
"type_info": "Integer"
},
{
"name": "network!: String",
"ordinal": 4,
"type_info": "Text"
},
{
"name": "first_seen_at",
"ordinal": 5,
"type_info": "Text"
}
],
"parameters": {
"Right": 0
},
"nullable": [false, false, false, false, false, false]
},
"hash": "37157927724c8bc647bf4f76f5698631cbd40637778dfa83e8f644ae6a7cf75b"
}

View file

@ -1,18 +0,0 @@
{
"db_name": "SQLite",
"query": "\n INSERT INTO monero_nodes (scheme, host, port, network, first_seen_at, updated_at)\n VALUES (?, ?, ?, ?, ?, ?)\n ON CONFLICT(scheme, host, port) DO UPDATE SET\n network = excluded.network,\n updated_at = excluded.updated_at\n RETURNING id\n ",
"describe": {
"columns": [
{
"name": "id",
"ordinal": 0,
"type_info": "Integer"
}
],
"parameters": {
"Right": 6
},
"nullable": [false]
},
"hash": "3870c77c7c5fbb9bdd57c365765178a08de20e442a07f3e734e61c410e4f338e"
}

View file

@ -0,0 +1,28 @@
{
"db_name": "SQLite",
"query": "\n SELECT \n n.scheme,\n n.host,\n n.port\n FROM monero_nodes n\n LEFT JOIN (\n SELECT \n node_id,\n SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,\n SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count\n FROM (\n SELECT node_id, was_successful\n FROM health_checks \n ORDER BY timestamp DESC \n LIMIT 1000\n ) recent_checks\n GROUP BY node_id\n ) stats ON n.id = stats.node_id\n WHERE n.network = ?\n ORDER BY \n CASE \n WHEN (COALESCE(stats.success_count, 0) + COALESCE(stats.failure_count, 0)) > 0 \n THEN CAST(COALESCE(stats.success_count, 0) AS REAL) / CAST(COALESCE(stats.success_count, 0) + COALESCE(stats.failure_count, 0) AS REAL)\n ELSE 0.0 \n END DESC\n LIMIT ?\n ",
"describe": {
"columns": [
{
"name": "scheme",
"ordinal": 0,
"type_info": "Text"
},
{
"name": "host",
"ordinal": 1,
"type_info": "Text"
},
{
"name": "port",
"ordinal": 2,
"type_info": "Integer"
}
],
"parameters": {
"Right": 2
},
"nullable": [false, false, false]
},
"hash": "44ddff5bdf5b56e9c1a9848641181de4441c8974b2d1304804874cf620420ad4"
}

View file

@ -1,12 +0,0 @@
{
"db_name": "SQLite",
"query": "\n INSERT INTO health_checks (node_id, timestamp, was_successful, latency_ms)\n VALUES (?, ?, ?, ?)\n ",
"describe": {
"columns": [],
"parameters": {
"Right": 4
},
"nullable": []
},
"hash": "56549d93f0e2106297b85565a52b2d9ac64d5b50fb7aa6028be3fcf266fc1d5d"
}

View file

@ -1,110 +0,0 @@
{
"db_name": "SQLite",
"query": "\n SELECT \n n.id as \"id!: i64\",\n n.scheme,\n n.host,\n n.port,\n n.network,\n n.first_seen_at,\n CAST(COALESCE(stats.success_count, 0) AS INTEGER) as \"success_count!: i64\",\n CAST(COALESCE(stats.failure_count, 0) AS INTEGER) as \"failure_count!: i64\",\n stats.last_success as \"last_success?: String\",\n stats.last_failure as \"last_failure?: String\",\n stats.last_checked as \"last_checked?: String\",\n CAST(CASE WHEN reliable_nodes.node_id IS NOT NULL THEN 1 ELSE 0 END AS INTEGER) as \"is_reliable!: i64\",\n stats.avg_latency_ms as \"avg_latency_ms?: f64\",\n stats.min_latency_ms as \"min_latency_ms?: f64\",\n stats.max_latency_ms as \"max_latency_ms?: f64\",\n stats.last_latency_ms as \"last_latency_ms?: f64\"\n FROM monero_nodes n\n LEFT JOIN (\n SELECT \n node_id,\n SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,\n SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count,\n MAX(CASE WHEN was_successful THEN timestamp END) as last_success,\n MAX(CASE WHEN NOT was_successful THEN timestamp END) as last_failure,\n MAX(timestamp) as last_checked,\n AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms,\n MIN(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as min_latency_ms,\n MAX(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as max_latency_ms,\n (SELECT latency_ms FROM health_checks hc2 WHERE hc2.node_id = health_checks.node_id ORDER BY timestamp DESC LIMIT 1) as last_latency_ms\n FROM health_checks \n GROUP BY node_id\n ) stats ON n.id = stats.node_id\n LEFT JOIN (\n SELECT DISTINCT node_id FROM (\n SELECT \n n2.id as node_id,\n COALESCE(s2.success_count, 0) as success_count,\n COALESCE(s2.failure_count, 0) as failure_count,\n s2.avg_latency_ms,\n (CAST(COALESCE(s2.success_count, 0) AS REAL) / CAST(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0) AS REAL)) * \n (MIN(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0), 200) / 200.0) * 0.8 +\n CASE \n WHEN s2.avg_latency_ms IS NOT NULL THEN (1.0 - (MIN(s2.avg_latency_ms, 2000) / 2000.0)) * 0.2\n ELSE 0.0 \n END as reliability_score\n FROM monero_nodes n2\n LEFT JOIN (\n SELECT \n node_id,\n SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,\n SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count,\n AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms\n FROM health_checks \n GROUP BY node_id\n ) s2 ON n2.id = s2.node_id\n WHERE n2.network = ? AND (COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0)) > 0\n ORDER BY reliability_score DESC\n LIMIT 4\n )\n ) reliable_nodes ON n.id = reliable_nodes.node_id\n WHERE n.network = ? AND stats.success_count > 0\n ORDER BY stats.avg_latency_ms ASC, stats.success_count DESC\n ",
"describe": {
"columns": [
{
"name": "id!: i64",
"ordinal": 0,
"type_info": "Integer"
},
{
"name": "scheme",
"ordinal": 1,
"type_info": "Text"
},
{
"name": "host",
"ordinal": 2,
"type_info": "Text"
},
{
"name": "port",
"ordinal": 3,
"type_info": "Integer"
},
{
"name": "network",
"ordinal": 4,
"type_info": "Text"
},
{
"name": "first_seen_at",
"ordinal": 5,
"type_info": "Text"
},
{
"name": "success_count!: i64",
"ordinal": 6,
"type_info": "Null"
},
{
"name": "failure_count!: i64",
"ordinal": 7,
"type_info": "Null"
},
{
"name": "last_success?: String",
"ordinal": 8,
"type_info": "Null"
},
{
"name": "last_failure?: String",
"ordinal": 9,
"type_info": "Null"
},
{
"name": "last_checked?: String",
"ordinal": 10,
"type_info": "Null"
},
{
"name": "is_reliable!: i64",
"ordinal": 11,
"type_info": "Null"
},
{
"name": "avg_latency_ms?: f64",
"ordinal": 12,
"type_info": "Null"
},
{
"name": "min_latency_ms?: f64",
"ordinal": 13,
"type_info": "Null"
},
{
"name": "max_latency_ms?: f64",
"ordinal": 14,
"type_info": "Null"
},
{
"name": "last_latency_ms?: f64",
"ordinal": 15,
"type_info": "Float"
}
],
"parameters": {
"Right": 2
},
"nullable": [
true,
false,
false,
false,
false,
false,
null,
null,
null,
null,
null,
null,
null,
null,
null,
true
]
},
"hash": "75ad770e6f70443871f919c26c189aaefc306e2a72b456fc2d03d4aa870e150b"
}

View file

@ -0,0 +1,28 @@
{
"db_name": "SQLite",
"query": "\n SELECT \n COUNT(*) as total,\n CAST(SUM(CASE WHEN stats.success_count > 0 THEN 1 ELSE 0 END) AS INTEGER) as \"reachable!: i64\",\n CAST(SUM(CASE WHEN stats.success_count > stats.failure_count AND stats.success_count > 0 THEN 1 ELSE 0 END) AS INTEGER) as \"reliable!: i64\"\n FROM monero_nodes n\n LEFT JOIN (\n SELECT \n node_id,\n SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,\n SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count\n FROM health_checks \n GROUP BY node_id\n ) stats ON n.id = stats.node_id\n WHERE n.network = ?\n ",
"describe": {
"columns": [
{
"name": "total",
"ordinal": 0,
"type_info": "Integer"
},
{
"name": "reachable!: i64",
"ordinal": 1,
"type_info": "Integer"
},
{
"name": "reliable!: i64",
"ordinal": 2,
"type_info": "Integer"
}
],
"parameters": {
"Right": 1
},
"nullable": [false, true, true]
},
"hash": "7bc8d637e7cf020bff58d05109ae88e56672e95e0e638af99e82df5b00595e77"
}

View file

@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "\n INSERT INTO health_checks (node_id, timestamp, was_successful, latency_ms)\n SELECT id, datetime('now'), ?, ?\n FROM monero_nodes \n WHERE scheme = ? AND host = ? AND port = ?\n ",
"describe": {
"columns": [],
"parameters": {
"Right": 5
},
"nullable": []
},
"hash": "9047f0683f1cf956e9b367b4e85d61fe0ca4b4f7a6ae5986025601b2000565d9"
}

View file

@ -1,110 +0,0 @@
{
"db_name": "SQLite",
"query": "\n SELECT \n n.id as \"id!: i64\",\n n.scheme,\n n.host,\n n.port,\n n.network,\n n.first_seen_at,\n CAST(COALESCE(stats.success_count, 0) AS INTEGER) as \"success_count!: i64\",\n CAST(COALESCE(stats.failure_count, 0) AS INTEGER) as \"failure_count!: i64\",\n stats.last_success as \"last_success?: String\",\n stats.last_failure as \"last_failure?: String\",\n stats.last_checked as \"last_checked?: String\",\n CAST(CASE WHEN reliable_nodes.node_id IS NOT NULL THEN 1 ELSE 0 END AS INTEGER) as \"is_reliable!: i64\",\n stats.avg_latency_ms as \"avg_latency_ms?: f64\",\n stats.min_latency_ms as \"min_latency_ms?: f64\",\n stats.max_latency_ms as \"max_latency_ms?: f64\",\n stats.last_latency_ms as \"last_latency_ms?: f64\"\n FROM monero_nodes n\n LEFT JOIN (\n SELECT \n node_id,\n SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,\n SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count,\n MAX(CASE WHEN was_successful THEN timestamp END) as last_success,\n MAX(CASE WHEN NOT was_successful THEN timestamp END) as last_failure,\n MAX(timestamp) as last_checked,\n AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms,\n MIN(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as min_latency_ms,\n MAX(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as max_latency_ms,\n (SELECT latency_ms FROM health_checks hc2 WHERE hc2.node_id = health_checks.node_id ORDER BY timestamp DESC LIMIT 1) as last_latency_ms\n FROM health_checks \n GROUP BY node_id\n ) stats ON n.id = stats.node_id\n LEFT JOIN (\n SELECT DISTINCT node_id FROM (\n SELECT \n n2.id as node_id,\n COALESCE(s2.success_count, 0) as success_count,\n COALESCE(s2.failure_count, 0) as failure_count,\n s2.avg_latency_ms,\n (CAST(COALESCE(s2.success_count, 0) AS REAL) / CAST(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0) AS REAL)) * \n (MIN(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0), 200) / 200.0) * 0.8 +\n CASE \n WHEN s2.avg_latency_ms IS NOT NULL THEN (1.0 - (MIN(s2.avg_latency_ms, 2000) / 2000.0)) * 0.2\n ELSE 0.0 \n END as reliability_score\n FROM monero_nodes n2\n LEFT JOIN (\n SELECT \n node_id,\n SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,\n SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count,\n AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms\n FROM health_checks \n GROUP BY node_id\n ) s2 ON n2.id = s2.node_id\n WHERE n2.network = ? AND (COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0)) > 0\n ORDER BY reliability_score DESC\n LIMIT 4\n )\n ) reliable_nodes ON n.id = reliable_nodes.node_id\n WHERE n.network = ?\n ORDER BY RANDOM()\n LIMIT ?\n ",
"describe": {
"columns": [
{
"name": "id!: i64",
"ordinal": 0,
"type_info": "Integer"
},
{
"name": "scheme",
"ordinal": 1,
"type_info": "Text"
},
{
"name": "host",
"ordinal": 2,
"type_info": "Text"
},
{
"name": "port",
"ordinal": 3,
"type_info": "Integer"
},
{
"name": "network",
"ordinal": 4,
"type_info": "Text"
},
{
"name": "first_seen_at",
"ordinal": 5,
"type_info": "Text"
},
{
"name": "success_count!: i64",
"ordinal": 6,
"type_info": "Null"
},
{
"name": "failure_count!: i64",
"ordinal": 7,
"type_info": "Null"
},
{
"name": "last_success?: String",
"ordinal": 8,
"type_info": "Null"
},
{
"name": "last_failure?: String",
"ordinal": 9,
"type_info": "Null"
},
{
"name": "last_checked?: String",
"ordinal": 10,
"type_info": "Null"
},
{
"name": "is_reliable!: i64",
"ordinal": 11,
"type_info": "Null"
},
{
"name": "avg_latency_ms?: f64",
"ordinal": 12,
"type_info": "Null"
},
{
"name": "min_latency_ms?: f64",
"ordinal": 13,
"type_info": "Null"
},
{
"name": "max_latency_ms?: f64",
"ordinal": 14,
"type_info": "Null"
},
{
"name": "last_latency_ms?: f64",
"ordinal": 15,
"type_info": "Float"
}
],
"parameters": {
"Right": 3
},
"nullable": [
true,
false,
false,
false,
false,
false,
null,
null,
null,
null,
null,
null,
null,
null,
null,
true
]
},
"hash": "9f6d042ab61e1d3d652d85c7d77d86a847c4a25d4ee0eab57380d10b94d2686d"
}

View file

@ -1,12 +0,0 @@
{
"db_name": "SQLite",
"query": "\n UPDATE monero_nodes \n SET network = ?, updated_at = ?\n WHERE scheme = ? AND host = ? AND port = ?\n ",
"describe": {
"columns": [],
"parameters": {
"Right": 5
},
"nullable": []
},
"hash": "b6d85d42bf72888afa22e27710e8cfe3885ed226ae6ae02d6585c1f2f4140d68"
}

View file

@ -1,28 +0,0 @@
{
"db_name": "SQLite",
"query": "\n SELECT \n COUNT(*) as total,\n CAST(SUM(CASE WHEN stats.success_count > 0 THEN 1 ELSE 0 END) AS INTEGER) as \"reachable!: i64\",\n CAST((SELECT COUNT(*) FROM (\n SELECT n2.id\n FROM monero_nodes n2\n LEFT JOIN (\n SELECT \n node_id,\n SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,\n SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count,\n AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms\n FROM health_checks \n GROUP BY node_id\n ) s2 ON n2.id = s2.node_id\n WHERE n2.network = ? AND (COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0)) > 0\n ORDER BY \n (CAST(COALESCE(s2.success_count, 0) AS REAL) / CAST(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0) AS REAL)) * \n (MIN(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0), 200) / 200.0) * 0.8 +\n CASE \n WHEN s2.avg_latency_ms IS NOT NULL THEN (1.0 - (MIN(s2.avg_latency_ms, 2000) / 2000.0)) * 0.2\n ELSE 0.0 \n END DESC\n LIMIT 4\n )) AS INTEGER) as \"reliable!: i64\"\n FROM monero_nodes n\n LEFT JOIN (\n SELECT \n node_id,\n SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,\n SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count\n FROM health_checks \n GROUP BY node_id\n ) stats ON n.id = stats.node_id\n WHERE n.network = ?\n ",
"describe": {
"columns": [
{
"name": "total",
"ordinal": 0,
"type_info": "Integer"
},
{
"name": "reachable!: i64",
"ordinal": 1,
"type_info": "Integer"
},
{
"name": "reliable!: i64",
"ordinal": 2,
"type_info": "Integer"
}
],
"parameters": {
"Right": 2
},
"nullable": [false, true, false]
},
"hash": "ffa1b76d20c86d6bea02bd03e5e7de159adbb7c7c0ef585ce4df9ec648bea7f8"
}

View file

@ -13,7 +13,6 @@ anyhow = "1"
axum = { version = "0.7", features = ["macros"] } axum = { version = "0.7", features = ["macros"] }
chrono = { version = "0.4", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }
clap = { version = "4.0", features = ["derive"] } clap = { version = "4.0", features = ["derive"] }
dirs = "5.0"
futures = "0.3" futures = "0.3"
monero = { version = "0.12", features = ["serde_support"] } monero = { version = "0.12", features = ["serde_support"] }
monero-rpc = { path = "../monero-rpc" } monero-rpc = { path = "../monero-rpc" }

View file

@ -0,0 +1,45 @@
-- Adds the default nodes from Feather Wallet to the database
-- Clears older nodes from the database
-- Delete all nodes from the database
DELETE FROM monero_nodes;
-- Delete all health checks
DELETE FROM health_checks;
-- Mainnet Nodes
INSERT OR IGNORE INTO monero_nodes (scheme, host, port, network, first_seen_at) VALUES
-- These support https
('https', 'node3-us.monero.love', 18081, 'mainnet', datetime('now')),
('https', 'xmr-node.cakewallet.com', 18081, 'mainnet', datetime('now')),
('https', 'node2.monerodevs.org', 18089, 'mainnet', datetime('now')),
('https', 'node3.monerodevs.org', 18089, 'mainnet', datetime('now')),
('https', 'node.sethforprivacy.com', 18089, 'mainnet', datetime('now')),
('https', 'xmr.stormycloud.org', 18089, 'mainnet', datetime('now')),
('https', 'node2-eu.monero.love', 18089, 'mainnet', datetime('now')),
('https', 'rucknium.me', 18081, 'mainnet', datetime('now')),
-- These do not support https
('http', 'singapore.node.xmr.pm', 18089, 'mainnet', datetime('now')),
('http', 'node.majesticbank.is', 18089, 'mainnet', datetime('now')),
('http', 'node.majesticbank.at', 18089, 'mainnet', datetime('now')),
('http', 'ravfx.its-a-node.org', 18081, 'mainnet', datetime('now')),
('http', 'ravfx2.its-a-node.org', 18089, 'mainnet', datetime('now')),
('http', 'selsta1.featherwallet.net', 18081, 'mainnet', datetime('now')),
('http', 'selsta2.featherwallet.net', 18081, 'mainnet', datetime('now')),
('http', 'node.trocador.app', 18089, 'mainnet', datetime('now')),
('http', 'node.xmr.ru', 18081, 'mainnet', datetime('now'));
-- Stagenet Nodes
INSERT OR IGNORE INTO monero_nodes (scheme, host, port, network, first_seen_at) VALUES
('https', 'node.sethforprivacy.com', 38089, 'stagenet', datetime('now')),
('https', 'xmr-lux.boldsuck.org', 38081, 'stagenet', datetime('now')),
('http', 'node2.sethforprivacy.com', 38089, 'stagenet', datetime('now')),
('http', 'stagenet.xmr-tw.org', 38081, 'stagenet', datetime('now')),
('http', 'singapore.node.xmr.pm', 38081, 'stagenet', datetime('now')),
('http', 'node.monerodevs.org', 38089, 'stagenet', datetime('now')),
('http', 'node2.monerodevs.org', 38089, 'stagenet', datetime('now')),
('http', 'node3.monerodevs.org', 38089, 'stagenet', datetime('now')),
('http', 'plowsoffjexmxalw73tkjmf422gq6575fc7vicuu4javzn2ynnte6tyd.onion', 38089, 'stagenet', datetime('now')),
('http', 'plowsof3t5hogddwabaeiyrno25efmzfxyro2vligremt7sxpsclfaid.onion', 38089, 'stagenet', datetime('now')),
('https', 'stagenet.xmr.ditatompel.com', 38081, 'stagenet', datetime('now'));

View file

@ -1,108 +1,9 @@
use std::path::PathBuf; use std::path::PathBuf;
use crate::types::{NodeAddress, NodeHealthStats, NodeMetadata, NodeRecord};
use anyhow::Result; use anyhow::Result;
use dirs::data_dir;
use serde::{Deserialize, Serialize};
use sqlx::SqlitePool; use sqlx::SqlitePool;
use tracing::{debug, info, warn}; use tracing::{info, warn};
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
pub struct MoneroNode {
pub id: Option<i64>,
pub scheme: String, // http or https
pub host: String,
pub port: i64,
pub network: String, // mainnet, stagenet, or testnet - always known at insertion time
pub first_seen_at: String, // ISO 8601 timestamp when first discovered
// Computed fields from health_checks (not stored in monero_nodes table)
#[sqlx(default)]
pub success_count: i64,
#[sqlx(default)]
pub failure_count: i64,
#[sqlx(default)]
pub last_success: Option<String>,
#[sqlx(default)]
pub last_failure: Option<String>,
#[sqlx(default)]
pub last_checked: Option<String>,
#[sqlx(default)]
pub is_reliable: bool,
#[sqlx(default)]
pub avg_latency_ms: Option<f64>,
#[sqlx(default)]
pub min_latency_ms: Option<f64>,
#[sqlx(default)]
pub max_latency_ms: Option<f64>,
#[sqlx(default)]
pub last_latency_ms: Option<f64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
pub struct HealthCheck {
pub id: Option<i64>,
pub node_id: i64,
pub timestamp: String, // ISO 8601 timestamp
pub was_successful: bool,
pub latency_ms: Option<f64>,
}
impl MoneroNode {
pub fn new(scheme: String, host: String, port: i64, network: String) -> Self {
// TODO: Do this in the database
let now = chrono::Utc::now().to_rfc3339();
Self {
id: None,
scheme,
host,
port,
network,
first_seen_at: now,
// These are computed from health_checks
success_count: 0,
failure_count: 0,
last_success: None,
last_failure: None,
last_checked: None,
is_reliable: false,
avg_latency_ms: None,
min_latency_ms: None,
max_latency_ms: None,
last_latency_ms: None,
}
}
pub fn full_url(&self) -> String {
format!("{}://{}:{}", self.scheme, self.host, self.port)
}
pub fn success_rate(&self) -> f64 {
let total = self.success_count + self.failure_count;
if total == 0 {
0.0
} else {
self.success_count as f64 / total as f64
}
}
pub fn reliability_score(&self) -> f64 {
let success_rate = self.success_rate();
let total_requests = self.success_count + self.failure_count;
// Weight success rate by total requests (more requests = more reliable data)
let request_weight = (total_requests as f64).min(200.0) / 200.0;
let mut score = success_rate * request_weight;
// Factor in latency - lower latency = higher score
if let Some(avg_latency) = self.avg_latency_ms {
// Normalize latency to 0-1 range (assuming 0-2000ms range)
let latency_factor = 1.0 - (avg_latency.min(2000.0) / 2000.0);
score = score * 0.8 + latency_factor * 0.2; // 80% success rate, 20% latency
}
score
}
}
#[derive(Clone)] #[derive(Clone)]
pub struct Database { pub struct Database {
@ -110,19 +11,15 @@ pub struct Database {
} }
impl Database { impl Database {
pub async fn new() -> Result<Self> { pub async fn new(data_dir: PathBuf) -> Result<Self> {
let app_data_dir = get_app_data_dir()?;
Self::new_with_data_dir(app_data_dir).await
}
pub async fn new_with_data_dir(data_dir: PathBuf) -> Result<Self> {
if !data_dir.exists() { if !data_dir.exists() {
std::fs::create_dir_all(&data_dir)?; std::fs::create_dir_all(&data_dir)?;
info!("Created application data directory: {}", data_dir.display()); info!("Created application data directory: {}", data_dir.display());
} }
let db_path = data_dir.join("nodes.db"); let db_path = data_dir.join("nodes.db");
info!("Using database at: {}", db_path.display());
info!("Using database at {}", db_path.display());
let database_url = format!("sqlite:{}?mode=rwc", db_path.display()); let database_url = format!("sqlite:{}?mode=rwc", db_path.display());
let pool = SqlitePool::connect(&database_url).await?; let pool = SqlitePool::connect(&database_url).await?;
@ -134,81 +31,9 @@ impl Database {
} }
async fn migrate(&self) -> Result<()> { async fn migrate(&self) -> Result<()> {
// Run sqlx migrations
sqlx::migrate!("./migrations").run(&self.pool).await?; sqlx::migrate!("./migrations").run(&self.pool).await?;
info!("Database migration completed"); info!("Database migration completed");
Ok(())
}
/// Insert a node if it doesn't exist, return the node_id
pub async fn upsert_node(
&self,
scheme: &str,
host: &str,
port: i64,
network: &str,
) -> Result<i64> {
let now = chrono::Utc::now().to_rfc3339();
let result = sqlx::query!(
r#"
INSERT INTO monero_nodes (scheme, host, port, network, first_seen_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(scheme, host, port) DO UPDATE SET
network = excluded.network,
updated_at = excluded.updated_at
RETURNING id
"#,
scheme,
host,
port,
network,
now,
now
)
.fetch_one(&self.pool)
.await?;
Ok(result.id)
}
/// Update a node's network after it has been identified
pub async fn update_node_network(
&self,
scheme: &str,
host: &str,
port: i64,
network: &str,
) -> Result<()> {
let now = chrono::Utc::now().to_rfc3339();
let result = sqlx::query!(
r#"
UPDATE monero_nodes
SET network = ?, updated_at = ?
WHERE scheme = ? AND host = ? AND port = ?
"#,
network,
now,
scheme,
host,
port
)
.execute(&self.pool)
.await?;
if result.rows_affected() > 0 {
debug!(
"Updated network for node {}://{}:{} to {}",
scheme, host, port, network
);
} else {
warn!(
"Failed to update network for node {}://{}:{}: not found",
scheme, host, port
);
}
Ok(()) Ok(())
} }
@ -222,151 +47,34 @@ impl Database {
was_successful: bool, was_successful: bool,
latency_ms: Option<f64>, latency_ms: Option<f64>,
) -> Result<()> { ) -> Result<()> {
let now = chrono::Utc::now().to_rfc3339(); let result = sqlx::query!(
r#"
// First get the node_id INSERT INTO health_checks (node_id, timestamp, was_successful, latency_ms)
let node_row = sqlx::query!( SELECT id, datetime('now'), ?, ?
"SELECT id FROM monero_nodes WHERE scheme = ? AND host = ? AND port = ?", FROM monero_nodes
WHERE scheme = ? AND host = ? AND port = ?
"#,
was_successful,
latency_ms,
scheme, scheme,
host, host,
port port
) )
.fetch_optional(&self.pool) .execute(&self.pool)
.await?; .await?;
let node_id = match node_row { if result.rows_affected() == 0 {
Some(row) => row.id,
None => {
warn!( warn!(
"Cannot record health check for unknown node: {}://{}:{}", "Cannot record health check for unknown node: {}://{}:{}",
scheme, host, port scheme, host, port
); );
return Ok(());
} }
};
sqlx::query!(
r#"
INSERT INTO health_checks (node_id, timestamp, was_successful, latency_ms)
VALUES (?, ?, ?, ?)
"#,
node_id,
now,
was_successful,
latency_ms
)
.execute(&self.pool)
.await?;
Ok(()) Ok(())
} }
/// Get nodes that have been identified (have network set)
pub async fn get_identified_nodes(&self, network: &str) -> Result<Vec<MoneroNode>> {
let rows = sqlx::query!(
r#"
SELECT
n.id as "id!: i64",
n.scheme,
n.host,
n.port,
n.network,
n.first_seen_at,
CAST(COALESCE(stats.success_count, 0) AS INTEGER) as "success_count!: i64",
CAST(COALESCE(stats.failure_count, 0) AS INTEGER) as "failure_count!: i64",
stats.last_success as "last_success?: String",
stats.last_failure as "last_failure?: String",
stats.last_checked as "last_checked?: String",
CAST(CASE WHEN reliable_nodes.node_id IS NOT NULL THEN 1 ELSE 0 END AS INTEGER) as "is_reliable!: i64",
stats.avg_latency_ms as "avg_latency_ms?: f64",
stats.min_latency_ms as "min_latency_ms?: f64",
stats.max_latency_ms as "max_latency_ms?: f64",
stats.last_latency_ms as "last_latency_ms?: f64"
FROM monero_nodes n
LEFT JOIN (
SELECT
node_id,
SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,
SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count,
MAX(CASE WHEN was_successful THEN timestamp END) as last_success,
MAX(CASE WHEN NOT was_successful THEN timestamp END) as last_failure,
MAX(timestamp) as last_checked,
AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms,
MIN(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as min_latency_ms,
MAX(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as max_latency_ms,
(SELECT latency_ms FROM health_checks hc2 WHERE hc2.node_id = health_checks.node_id ORDER BY timestamp DESC LIMIT 1) as last_latency_ms
FROM health_checks
GROUP BY node_id
) stats ON n.id = stats.node_id
LEFT JOIN (
SELECT DISTINCT node_id FROM (
SELECT
n2.id as node_id,
COALESCE(s2.success_count, 0) as success_count,
COALESCE(s2.failure_count, 0) as failure_count,
s2.avg_latency_ms,
(CAST(COALESCE(s2.success_count, 0) AS REAL) / CAST(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0) AS REAL)) *
(MIN(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0), 200) / 200.0) * 0.8 +
CASE
WHEN s2.avg_latency_ms IS NOT NULL THEN (1.0 - (MIN(s2.avg_latency_ms, 2000) / 2000.0)) * 0.2
ELSE 0.0
END as reliability_score
FROM monero_nodes n2
LEFT JOIN (
SELECT
node_id,
SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,
SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count,
AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms
FROM health_checks
GROUP BY node_id
) s2 ON n2.id = s2.node_id
WHERE n2.network = ? AND (COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0)) > 0
ORDER BY reliability_score DESC
LIMIT 4
)
) reliable_nodes ON n.id = reliable_nodes.node_id
WHERE n.network = ?
ORDER BY stats.avg_latency_ms ASC, stats.success_count DESC
"#,
network,
network
)
.fetch_all(&self.pool)
.await?;
let nodes: Vec<MoneroNode> = rows
.into_iter()
.map(|row| MoneroNode {
id: Some(row.id),
scheme: row.scheme,
host: row.host,
port: row.port,
network: row.network,
first_seen_at: row.first_seen_at,
success_count: row.success_count,
failure_count: row.failure_count,
last_success: row.last_success,
last_failure: row.last_failure,
last_checked: row.last_checked,
is_reliable: row.is_reliable != 0,
avg_latency_ms: row.avg_latency_ms,
min_latency_ms: row.min_latency_ms,
max_latency_ms: row.max_latency_ms,
last_latency_ms: row.last_latency_ms,
})
.collect();
debug!(
"Retrieved {} identified nodes for network {}",
nodes.len(),
network
);
Ok(nodes)
}
/// Get reliable nodes (top 4 by reliability score) /// Get reliable nodes (top 4 by reliability score)
pub async fn get_reliable_nodes(&self, network: &str) -> Result<Vec<MoneroNode>> { pub async fn get_reliable_nodes(&self, network: &str) -> Result<Vec<NodeRecord>> {
let rows = sqlx::query!( let rows = sqlx::query!(
r#" r#"
SELECT SELECT
@ -417,25 +125,28 @@ impl Database {
.fetch_all(&self.pool) .fetch_all(&self.pool)
.await?; .await?;
let nodes = rows let nodes: Vec<NodeRecord> = rows
.into_iter() .into_iter()
.map(|row| MoneroNode { .map(|row| {
id: Some(row.id), let address = NodeAddress::new(row.scheme, row.host, row.port as u16);
scheme: row.scheme, let first_seen_at = row
host: row.host, .first_seen_at
port: row.port, .parse()
network: row.network, .unwrap_or_else(|_| chrono::Utc::now());
first_seen_at: row.first_seen_at,
let metadata = NodeMetadata::new(row.id, row.network, first_seen_at);
let health = NodeHealthStats {
success_count: row.success_count, success_count: row.success_count,
failure_count: row.failure_count, failure_count: row.failure_count,
last_success: row.last_success, last_success: row.last_success.and_then(|s| s.parse().ok()),
last_failure: row.last_failure, last_failure: row.last_failure.and_then(|s| s.parse().ok()),
last_checked: row.last_checked, last_checked: row.last_checked.and_then(|s| s.parse().ok()),
is_reliable: true,
avg_latency_ms: row.avg_latency_ms, avg_latency_ms: row.avg_latency_ms,
min_latency_ms: row.min_latency_ms, min_latency_ms: row.min_latency_ms,
max_latency_ms: row.max_latency_ms, max_latency_ms: row.max_latency_ms,
last_latency_ms: row.last_latency_ms, last_latency_ms: row.last_latency_ms,
};
NodeRecord::new(address, metadata, health)
}) })
.collect(); .collect();
@ -449,28 +160,7 @@ impl Database {
SELECT SELECT
COUNT(*) as total, COUNT(*) as total,
CAST(SUM(CASE WHEN stats.success_count > 0 THEN 1 ELSE 0 END) AS INTEGER) as "reachable!: i64", CAST(SUM(CASE WHEN stats.success_count > 0 THEN 1 ELSE 0 END) AS INTEGER) as "reachable!: i64",
CAST((SELECT COUNT(*) FROM ( CAST(SUM(CASE WHEN stats.success_count > stats.failure_count AND stats.success_count > 0 THEN 1 ELSE 0 END) AS INTEGER) as "reliable!: i64"
SELECT n2.id
FROM monero_nodes n2
LEFT JOIN (
SELECT
node_id,
SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,
SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count,
AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms
FROM health_checks
GROUP BY node_id
) s2 ON n2.id = s2.node_id
WHERE n2.network = ? AND (COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0)) > 0
ORDER BY
(CAST(COALESCE(s2.success_count, 0) AS REAL) / CAST(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0) AS REAL)) *
(MIN(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0), 200) / 200.0) * 0.8 +
CASE
WHEN s2.avg_latency_ms IS NOT NULL THEN (1.0 - (MIN(s2.avg_latency_ms, 2000) / 2000.0)) * 0.2
ELSE 0.0
END DESC
LIMIT 4
)) AS INTEGER) as "reliable!: i64"
FROM monero_nodes n FROM monero_nodes n
LEFT JOIN ( LEFT JOIN (
SELECT SELECT
@ -482,17 +172,12 @@ impl Database {
) stats ON n.id = stats.node_id ) stats ON n.id = stats.node_id
WHERE n.network = ? WHERE n.network = ?
"#, "#,
network,
network network
) )
.fetch_one(&self.pool) .fetch_one(&self.pool)
.await?; .await?;
let total = row.total; Ok((row.total, row.reachable, row.reliable))
let reachable = row.reachable;
let reliable = row.reliable;
Ok((total, reachable, reliable))
} }
/// Get health check statistics for a network /// Get health check statistics for a network
@ -522,444 +207,52 @@ impl Database {
Ok((successful, unsuccessful)) Ok((successful, unsuccessful))
} }
/// Get top nodes based on recent success rate and latency /// Get top nodes based on success rate
pub async fn get_top_nodes_by_recent_success( pub async fn get_top_nodes_by_recent_success(
&self, &self,
network: &str, network: &str,
_recent_checks_limit: i64,
limit: i64, limit: i64,
) -> Result<Vec<MoneroNode>> { ) -> Result<Vec<NodeAddress>> {
let rows = sqlx::query!( let rows = sqlx::query!(
r#" r#"
SELECT SELECT
n.id as "id!: i64",
n.scheme, n.scheme,
n.host, n.host,
n.port, n.port
n.network,
n.first_seen_at,
CAST(COALESCE(stats.success_count, 0) AS INTEGER) as "success_count!: i64",
CAST(COALESCE(stats.failure_count, 0) AS INTEGER) as "failure_count!: i64",
stats.last_success as "last_success?: String",
stats.last_failure as "last_failure?: String",
stats.last_checked as "last_checked?: String",
CAST(CASE WHEN reliable_nodes.node_id IS NOT NULL THEN 1 ELSE 0 END AS INTEGER) as "is_reliable!: i64",
stats.avg_latency_ms as "avg_latency_ms?: f64",
stats.min_latency_ms as "min_latency_ms?: f64",
stats.max_latency_ms as "max_latency_ms?: f64",
stats.last_latency_ms as "last_latency_ms?: f64"
FROM monero_nodes n FROM monero_nodes n
LEFT JOIN ( LEFT JOIN (
SELECT SELECT
node_id, node_id,
SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count, SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,
SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count, SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count
MAX(CASE WHEN was_successful THEN timestamp END) as last_success, FROM (
MAX(CASE WHEN NOT was_successful THEN timestamp END) as last_failure, SELECT node_id, was_successful
MAX(timestamp) as last_checked,
AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms,
MIN(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as min_latency_ms,
MAX(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as max_latency_ms,
(SELECT latency_ms FROM health_checks hc2 WHERE hc2.node_id = health_checks.node_id ORDER BY timestamp DESC LIMIT 1) as last_latency_ms
FROM health_checks FROM health_checks
ORDER BY timestamp DESC
LIMIT 1000
) recent_checks
GROUP BY node_id GROUP BY node_id
) stats ON n.id = stats.node_id ) stats ON n.id = stats.node_id
LEFT JOIN ( WHERE n.network = ?
SELECT DISTINCT node_id FROM (
SELECT
n2.id as node_id,
COALESCE(s2.success_count, 0) as success_count,
COALESCE(s2.failure_count, 0) as failure_count,
s2.avg_latency_ms,
(CAST(COALESCE(s2.success_count, 0) AS REAL) / CAST(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0) AS REAL)) *
(MIN(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0), 200) / 200.0) * 0.8 +
CASE
WHEN s2.avg_latency_ms IS NOT NULL THEN (1.0 - (MIN(s2.avg_latency_ms, 2000) / 2000.0)) * 0.2
ELSE 0.0
END as reliability_score
FROM monero_nodes n2
LEFT JOIN (
SELECT
node_id,
SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,
SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count,
AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms
FROM health_checks
GROUP BY node_id
) s2 ON n2.id = s2.node_id
WHERE n2.network = ? AND (COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0)) > 0
ORDER BY reliability_score DESC
LIMIT 4
)
) reliable_nodes ON n.id = reliable_nodes.node_id
WHERE n.network = ? AND (COALESCE(stats.success_count, 0) + COALESCE(stats.failure_count, 0)) > 0
ORDER BY ORDER BY
(CAST(COALESCE(stats.success_count, 0) AS REAL) / CAST(COALESCE(stats.success_count, 0) + COALESCE(stats.failure_count, 0) AS REAL)) DESC, CASE
stats.avg_latency_ms ASC WHEN (COALESCE(stats.success_count, 0) + COALESCE(stats.failure_count, 0)) > 0
THEN CAST(COALESCE(stats.success_count, 0) AS REAL) / CAST(COALESCE(stats.success_count, 0) + COALESCE(stats.failure_count, 0) AS REAL)
ELSE 0.0
END DESC
LIMIT ? LIMIT ?
"#, "#,
network, network,
network,
limit limit
) )
.fetch_all(&self.pool) .fetch_all(&self.pool)
.await?; .await?;
let nodes = rows let addresses: Vec<NodeAddress> = rows
.into_iter() .into_iter()
.map(|row| MoneroNode { .map(|row| NodeAddress::new(row.scheme, row.host, row.port as u16))
id: Some(row.id),
scheme: row.scheme,
host: row.host,
port: row.port,
network: row.network,
first_seen_at: row.first_seen_at,
success_count: row.success_count,
failure_count: row.failure_count,
last_success: row.last_success,
last_failure: row.last_failure,
last_checked: row.last_checked,
is_reliable: row.is_reliable != 0,
avg_latency_ms: row.avg_latency_ms,
min_latency_ms: row.min_latency_ms,
max_latency_ms: row.max_latency_ms,
last_latency_ms: row.last_latency_ms,
})
.collect(); .collect();
Ok(nodes) Ok(addresses)
}
/// Get identified nodes that have at least one successful health check
pub async fn get_identified_nodes_with_success(
&self,
network: &str,
) -> Result<Vec<MoneroNode>> {
let rows = sqlx::query!(
r#"
SELECT
n.id as "id!: i64",
n.scheme,
n.host,
n.port,
n.network,
n.first_seen_at,
CAST(COALESCE(stats.success_count, 0) AS INTEGER) as "success_count!: i64",
CAST(COALESCE(stats.failure_count, 0) AS INTEGER) as "failure_count!: i64",
stats.last_success as "last_success?: String",
stats.last_failure as "last_failure?: String",
stats.last_checked as "last_checked?: String",
CAST(CASE WHEN reliable_nodes.node_id IS NOT NULL THEN 1 ELSE 0 END AS INTEGER) as "is_reliable!: i64",
stats.avg_latency_ms as "avg_latency_ms?: f64",
stats.min_latency_ms as "min_latency_ms?: f64",
stats.max_latency_ms as "max_latency_ms?: f64",
stats.last_latency_ms as "last_latency_ms?: f64"
FROM monero_nodes n
LEFT JOIN (
SELECT
node_id,
SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,
SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count,
MAX(CASE WHEN was_successful THEN timestamp END) as last_success,
MAX(CASE WHEN NOT was_successful THEN timestamp END) as last_failure,
MAX(timestamp) as last_checked,
AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms,
MIN(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as min_latency_ms,
MAX(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as max_latency_ms,
(SELECT latency_ms FROM health_checks hc2 WHERE hc2.node_id = health_checks.node_id ORDER BY timestamp DESC LIMIT 1) as last_latency_ms
FROM health_checks
GROUP BY node_id
) stats ON n.id = stats.node_id
LEFT JOIN (
SELECT DISTINCT node_id FROM (
SELECT
n2.id as node_id,
COALESCE(s2.success_count, 0) as success_count,
COALESCE(s2.failure_count, 0) as failure_count,
s2.avg_latency_ms,
(CAST(COALESCE(s2.success_count, 0) AS REAL) / CAST(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0) AS REAL)) *
(MIN(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0), 200) / 200.0) * 0.8 +
CASE
WHEN s2.avg_latency_ms IS NOT NULL THEN (1.0 - (MIN(s2.avg_latency_ms, 2000) / 2000.0)) * 0.2
ELSE 0.0
END as reliability_score
FROM monero_nodes n2
LEFT JOIN (
SELECT
node_id,
SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,
SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count,
AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms
FROM health_checks
GROUP BY node_id
) s2 ON n2.id = s2.node_id
WHERE n2.network = ? AND (COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0)) > 0
ORDER BY reliability_score DESC
LIMIT 4
)
) reliable_nodes ON n.id = reliable_nodes.node_id
WHERE n.network = ? AND stats.success_count > 0
ORDER BY stats.avg_latency_ms ASC, stats.success_count DESC
"#,
network,
network
)
.fetch_all(&self.pool)
.await?;
let nodes: Vec<MoneroNode> = rows
.into_iter()
.map(|row| MoneroNode {
id: Some(row.id),
scheme: row.scheme,
host: row.host,
port: row.port,
network: row.network,
first_seen_at: row.first_seen_at,
success_count: row.success_count,
failure_count: row.failure_count,
last_success: row.last_success,
last_failure: row.last_failure,
last_checked: row.last_checked,
is_reliable: row.is_reliable != 0,
avg_latency_ms: row.avg_latency_ms,
min_latency_ms: row.min_latency_ms,
max_latency_ms: row.max_latency_ms,
last_latency_ms: row.last_latency_ms,
})
.collect();
debug!(
"Retrieved {} identified nodes with success for network {}",
nodes.len(),
network
);
Ok(nodes)
}
/// Get random nodes for the specified network, excluding specific IDs
pub async fn get_random_nodes(
&self,
network: &str,
limit: i64,
exclude_ids: &[i64],
) -> Result<Vec<MoneroNode>> {
if exclude_ids.is_empty() {
let rows = sqlx::query!(
r#"
SELECT
n.id as "id!: i64",
n.scheme,
n.host,
n.port,
n.network,
n.first_seen_at,
CAST(COALESCE(stats.success_count, 0) AS INTEGER) as "success_count!: i64",
CAST(COALESCE(stats.failure_count, 0) AS INTEGER) as "failure_count!: i64",
stats.last_success as "last_success?: String",
stats.last_failure as "last_failure?: String",
stats.last_checked as "last_checked?: String",
CAST(CASE WHEN reliable_nodes.node_id IS NOT NULL THEN 1 ELSE 0 END AS INTEGER) as "is_reliable!: i64",
stats.avg_latency_ms as "avg_latency_ms?: f64",
stats.min_latency_ms as "min_latency_ms?: f64",
stats.max_latency_ms as "max_latency_ms?: f64",
stats.last_latency_ms as "last_latency_ms?: f64"
FROM monero_nodes n
LEFT JOIN (
SELECT
node_id,
SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,
SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count,
MAX(CASE WHEN was_successful THEN timestamp END) as last_success,
MAX(CASE WHEN NOT was_successful THEN timestamp END) as last_failure,
MAX(timestamp) as last_checked,
AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms,
MIN(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as min_latency_ms,
MAX(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as max_latency_ms,
(SELECT latency_ms FROM health_checks hc2 WHERE hc2.node_id = health_checks.node_id ORDER BY timestamp DESC LIMIT 1) as last_latency_ms
FROM health_checks
GROUP BY node_id
) stats ON n.id = stats.node_id
LEFT JOIN (
SELECT DISTINCT node_id FROM (
SELECT
n2.id as node_id,
COALESCE(s2.success_count, 0) as success_count,
COALESCE(s2.failure_count, 0) as failure_count,
s2.avg_latency_ms,
(CAST(COALESCE(s2.success_count, 0) AS REAL) / CAST(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0) AS REAL)) *
(MIN(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0), 200) / 200.0) * 0.8 +
CASE
WHEN s2.avg_latency_ms IS NOT NULL THEN (1.0 - (MIN(s2.avg_latency_ms, 2000) / 2000.0)) * 0.2
ELSE 0.0
END as reliability_score
FROM monero_nodes n2
LEFT JOIN (
SELECT
node_id,
SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,
SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count,
AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms
FROM health_checks
GROUP BY node_id
) s2 ON n2.id = s2.node_id
WHERE n2.network = ? AND (COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0)) > 0
ORDER BY reliability_score DESC
LIMIT 4
)
) reliable_nodes ON n.id = reliable_nodes.node_id
WHERE n.network = ?
ORDER BY RANDOM()
LIMIT ?
"#,
network,
network,
limit
)
.fetch_all(&self.pool)
.await?;
return Ok(rows
.into_iter()
.map(|row| MoneroNode {
id: Some(row.id),
scheme: row.scheme,
host: row.host,
port: row.port,
network: row.network,
first_seen_at: row.first_seen_at,
success_count: row.success_count,
failure_count: row.failure_count,
last_success: row.last_success,
last_failure: row.last_failure,
last_checked: row.last_checked,
is_reliable: row.is_reliable != 0,
avg_latency_ms: row.avg_latency_ms,
min_latency_ms: row.min_latency_ms,
max_latency_ms: row.max_latency_ms,
last_latency_ms: row.last_latency_ms,
})
.collect());
}
// If exclude_ids is not empty, we need to handle it differently
// For now, get all nodes and filter in Rust (can be optimized with dynamic SQL)
let fetch_limit = limit + exclude_ids.len() as i64 + 10; // Get extra to account for exclusions
let all_rows = sqlx::query!(
r#"
SELECT
n.id as "id!: i64",
n.scheme,
n.host,
n.port,
n.network,
n.first_seen_at,
CAST(COALESCE(stats.success_count, 0) AS INTEGER) as "success_count!: i64",
CAST(COALESCE(stats.failure_count, 0) AS INTEGER) as "failure_count!: i64",
stats.last_success as "last_success?: String",
stats.last_failure as "last_failure?: String",
stats.last_checked as "last_checked?: String",
CAST(CASE WHEN reliable_nodes.node_id IS NOT NULL THEN 1 ELSE 0 END AS INTEGER) as "is_reliable!: i64",
stats.avg_latency_ms as "avg_latency_ms?: f64",
stats.min_latency_ms as "min_latency_ms?: f64",
stats.max_latency_ms as "max_latency_ms?: f64",
stats.last_latency_ms as "last_latency_ms?: f64"
FROM monero_nodes n
LEFT JOIN (
SELECT
node_id,
SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,
SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count,
MAX(CASE WHEN was_successful THEN timestamp END) as last_success,
MAX(CASE WHEN NOT was_successful THEN timestamp END) as last_failure,
MAX(timestamp) as last_checked,
AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms,
MIN(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as min_latency_ms,
MAX(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as max_latency_ms,
(SELECT latency_ms FROM health_checks hc2 WHERE hc2.node_id = health_checks.node_id ORDER BY timestamp DESC LIMIT 1) as last_latency_ms
FROM health_checks
GROUP BY node_id
) stats ON n.id = stats.node_id
LEFT JOIN (
SELECT DISTINCT node_id FROM (
SELECT
n2.id as node_id,
COALESCE(s2.success_count, 0) as success_count,
COALESCE(s2.failure_count, 0) as failure_count,
s2.avg_latency_ms,
(CAST(COALESCE(s2.success_count, 0) AS REAL) / CAST(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0) AS REAL)) *
(MIN(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0), 200) / 200.0) * 0.8 +
CASE
WHEN s2.avg_latency_ms IS NOT NULL THEN (1.0 - (MIN(s2.avg_latency_ms, 2000) / 2000.0)) * 0.2
ELSE 0.0
END as reliability_score
FROM monero_nodes n2
LEFT JOIN (
SELECT
node_id,
SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,
SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count,
AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms
FROM health_checks
GROUP BY node_id
) s2 ON n2.id = s2.node_id
WHERE n2.network = ? AND (COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0)) > 0
ORDER BY reliability_score DESC
LIMIT 4
)
) reliable_nodes ON n.id = reliable_nodes.node_id
WHERE n.network = ?
ORDER BY RANDOM()
LIMIT ?
"#,
network,
network,
fetch_limit
)
.fetch_all(&self.pool)
.await?;
// Convert exclude_ids to a HashSet for O(1) lookup
let exclude_set: std::collections::HashSet<i64> = exclude_ids.iter().cloned().collect();
let nodes: Vec<MoneroNode> = all_rows
.into_iter()
.filter(|row| !exclude_set.contains(&row.id))
.take(limit as usize)
.map(|row| MoneroNode {
id: Some(row.id),
scheme: row.scheme,
host: row.host,
port: row.port,
network: row.network,
first_seen_at: row.first_seen_at,
success_count: row.success_count,
failure_count: row.failure_count,
last_success: row.last_success,
last_failure: row.last_failure,
last_checked: row.last_checked,
is_reliable: row.is_reliable != 0,
avg_latency_ms: row.avg_latency_ms,
min_latency_ms: row.min_latency_ms,
max_latency_ms: row.max_latency_ms,
last_latency_ms: row.last_latency_ms,
})
.collect();
Ok(nodes)
} }
} }
pub fn get_app_data_dir() -> Result<PathBuf> {
let base_dir =
data_dir().ok_or_else(|| anyhow::anyhow!("Could not determine system data directory"))?;
let app_dir = base_dir.join("monero-rpc-pool");
if !app_dir.exists() {
std::fs::create_dir_all(&app_dir)?;
info!("Created application data directory: {}", app_dir.display());
}
Ok(app_dir)
}

View file

@ -1,399 +0,0 @@
use std::collections::HashSet;
use std::time::{Duration, Instant};
use anyhow::Result;
use monero::Network;
use rand::seq::SliceRandom;
use reqwest::Client;
use serde::Deserialize;
use serde_json::Value;
use tracing::{error, info, warn};
use url;
use crate::database::Database;
#[derive(Debug, Deserialize)]
struct MoneroFailResponse {
monero: MoneroNodes,
}
#[derive(Debug, Deserialize)]
struct MoneroNodes {
clear: Vec<String>,
#[serde(default)]
web_compatible: Vec<String>,
}
#[derive(Debug)]
pub struct HealthCheckOutcome {
pub was_successful: bool,
pub latency: Duration,
pub discovered_network: Option<Network>,
}
#[derive(Clone)]
pub struct NodeDiscovery {
client: Client,
db: Database,
}
fn network_to_string(network: &Network) -> String {
match network {
Network::Mainnet => "mainnet".to_string(),
Network::Stagenet => "stagenet".to_string(),
Network::Testnet => "testnet".to_string(),
}
}
impl NodeDiscovery {
pub fn new(db: Database) -> Result<Self> {
let client = Client::builder()
.timeout(Duration::from_secs(10))
.user_agent("monero-rpc-pool/1.0")
.build()
.map_err(|e| anyhow::anyhow!("Failed to build HTTP client: {}", e))?;
Ok(Self { client, db })
}
/// Fetch nodes from monero.fail API
pub async fn fetch_mainnet_nodes_from_api(&self) -> Result<Vec<String>> {
let url = "https://monero.fail/nodes.json?chain=monero";
let response = self
.client
.get(url)
.timeout(Duration::from_secs(30))
.send()
.await?;
if !response.status().is_success() {
return Err(anyhow::anyhow!("HTTP error: {}", response.status()));
}
let monero_fail_response: MoneroFailResponse = response.json().await?;
// Combine clear and web_compatible nodes
let mut nodes = monero_fail_response.monero.web_compatible;
nodes.extend(monero_fail_response.monero.clear);
// Remove duplicates using HashSet for O(n) complexity
let mut seen = HashSet::new();
let mut unique_nodes = Vec::new();
for node in nodes {
if seen.insert(node.clone()) {
unique_nodes.push(node);
}
}
// Shuffle nodes in random order
let mut rng = rand::thread_rng();
unique_nodes.shuffle(&mut rng);
info!(
"Fetched {} mainnet nodes from monero.fail API",
unique_nodes.len()
);
Ok(unique_nodes)
}
/// Fetch nodes from monero.fail API and discover from other sources
pub async fn discover_nodes_from_sources(&self, target_network: Network) -> Result<()> {
// Only fetch from external sources for mainnet to avoid polluting test networks
if target_network == Network::Mainnet {
match self.fetch_mainnet_nodes_from_api().await {
Ok(nodes) => {
self.discover_and_insert_nodes(target_network, nodes)
.await?;
}
Err(e) => {
warn!("Failed to fetch nodes from monero.fail API: {}", e);
}
}
}
Ok(())
}
/// Enhanced health check that detects network and validates node identity
pub async fn check_node_health(
&self,
scheme: &str,
host: &str,
port: i64,
) -> Result<HealthCheckOutcome> {
let start_time = Instant::now();
let rpc_request = serde_json::json!({
"jsonrpc": "2.0",
"id": "0",
"method": "get_info"
});
let node_url = format!("{}://{}:{}/json_rpc", scheme, host, port);
let response = self.client.post(&node_url).json(&rpc_request).send().await;
let latency = start_time.elapsed();
match response {
Ok(resp) => {
if resp.status().is_success() {
match resp.json::<Value>().await {
Ok(json) => {
if let Some(result) = json.get("result") {
// Extract network information from get_info response
let discovered_network = self.extract_network_from_info(result);
Ok(HealthCheckOutcome {
was_successful: true,
latency,
discovered_network,
})
} else {
Ok(HealthCheckOutcome {
was_successful: false,
latency,
discovered_network: None,
})
}
}
Err(_e) => Ok(HealthCheckOutcome {
was_successful: false,
latency,
discovered_network: None,
}),
}
} else {
Ok(HealthCheckOutcome {
was_successful: false,
latency,
discovered_network: None,
})
}
}
Err(_e) => Ok(HealthCheckOutcome {
was_successful: false,
latency,
discovered_network: None,
}),
}
}
/// Extract network type from get_info response
fn extract_network_from_info(&self, info_result: &Value) -> Option<Network> {
// Check nettype field (0 = mainnet, 1 = testnet, 2 = stagenet)
if let Some(nettype) = info_result.get("nettype").and_then(|v| v.as_u64()) {
return match nettype {
0 => Some(Network::Mainnet),
1 => Some(Network::Testnet),
2 => Some(Network::Stagenet),
_ => None,
};
}
// Fallback: check if testnet or stagenet is mentioned in fields
if let Some(testnet) = info_result.get("testnet").and_then(|v| v.as_bool()) {
return if testnet {
Some(Network::Testnet)
} else {
Some(Network::Mainnet)
};
}
// Additional heuristics could be added here
None
}
/// Updated health check workflow with identification and validation logic
pub async fn health_check_all_nodes(&self, target_network: Network) -> Result<()> {
info!(
"Starting health check for all nodes targeting network: {}",
network_to_string(&target_network)
);
// Get all nodes from database with proper field mapping
let all_nodes = sqlx::query!(
r#"
SELECT
id as "id!: i64",
scheme,
host,
port,
network as "network!: String",
first_seen_at
FROM monero_nodes
ORDER BY id
"#
)
.fetch_all(&self.db.pool)
.await?;
let mut checked_count = 0;
let mut healthy_count = 0;
let mut corrected_count = 0;
for node in all_nodes {
match self
.check_node_health(&node.scheme, &node.host, node.port)
.await
{
Ok(outcome) => {
// Always record the health check
self.db
.record_health_check(
&node.scheme,
&node.host,
node.port,
outcome.was_successful,
if outcome.was_successful {
Some(outcome.latency.as_millis() as f64)
} else {
None
},
)
.await?;
if outcome.was_successful {
healthy_count += 1;
// Validate network consistency
if let Some(discovered_network) = outcome.discovered_network {
let discovered_network_str = network_to_string(&discovered_network);
if node.network != discovered_network_str {
let node_url =
format!("{}://{}:{}", node.scheme, node.host, node.port);
warn!("Network mismatch detected for node {}: stored={}, discovered={}. Correcting...",
node_url, node.network, discovered_network_str);
self.db
.update_node_network(
&node.scheme,
&node.host,
node.port,
&discovered_network_str,
)
.await?;
corrected_count += 1;
}
}
}
checked_count += 1;
}
Err(_e) => {
self.db
.record_health_check(&node.scheme, &node.host, node.port, false, None)
.await?;
}
}
// Small delay to avoid hammering nodes
tokio::time::sleep(Duration::from_secs(2)).await;
}
info!(
"Health check completed: {}/{} nodes healthy, {} corrected",
healthy_count, checked_count, corrected_count
);
Ok(())
}
/// Periodic discovery task with improved error handling
pub async fn periodic_discovery_task(&self, target_network: Network) -> Result<()> {
let mut interval = tokio::time::interval(Duration::from_secs(3600)); // Every hour
loop {
interval.tick().await;
info!(
"Running periodic node discovery for network: {}",
network_to_string(&target_network)
);
// Discover new nodes from sources
if let Err(e) = self.discover_nodes_from_sources(target_network).await {
error!("Failed to discover nodes: {}", e);
}
// Health check all nodes (will identify networks automatically)
if let Err(e) = self.health_check_all_nodes(target_network).await {
error!("Failed to perform health check: {}", e);
}
// Log stats for all networks
for network in &[Network::Mainnet, Network::Stagenet, Network::Testnet] {
let network_str = network_to_string(network);
if let Ok((total, reachable, reliable)) = self.db.get_node_stats(&network_str).await
{
if total > 0 {
info!(
"Node stats for {}: {} total, {} reachable, {} reliable",
network_str, total, reachable, reliable
);
}
}
}
}
}
/// Insert configured nodes for a specific network
pub async fn discover_and_insert_nodes(
&self,
target_network: Network,
nodes: Vec<String>,
) -> Result<()> {
let mut success_count = 0;
let mut error_count = 0;
let target_network_str = network_to_string(&target_network);
for node_url in nodes.iter() {
if let Ok(url) = url::Url::parse(node_url) {
let scheme = url.scheme();
// Validate scheme - must be http or https
if !matches!(scheme, "http" | "https") {
continue;
}
// Validate host - must be non-empty
let Some(host) = url.host_str() else {
continue;
};
if host.is_empty() {
continue;
}
// Validate port - must be present
let Some(port) = url.port() else {
continue;
};
let port = port as i64;
match self
.db
.upsert_node(scheme, host, port, &target_network_str)
.await
{
Ok(_) => {
success_count += 1;
}
Err(e) => {
error_count += 1;
error!(
"Failed to insert configured node {}://{}:{}: {}",
scheme, host, port, e
);
}
}
} else {
error_count += 1;
error!("Failed to parse node URL: {}", node_url);
}
}
info!(
"Configured node insertion complete: {} successful, {} errors",
success_count, error_count
);
Ok(())
}
}

View file

@ -6,46 +6,49 @@ use axum::{
Router, Router,
}; };
use monero::Network; use monero::Network;
use tokio::sync::RwLock;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tower_http::cors::CorsLayer; use tower_http::cors::CorsLayer;
use tracing::{error, info}; use tracing::{error, info};
fn network_to_string(network: &Network) -> String { pub trait ToNetworkString {
match network { fn to_network_string(&self) -> String;
}
impl ToNetworkString for Network {
fn to_network_string(&self) -> String {
match self {
Network::Mainnet => "mainnet".to_string(), Network::Mainnet => "mainnet".to_string(),
Network::Stagenet => "stagenet".to_string(), Network::Stagenet => "stagenet".to_string(),
Network::Testnet => "testnet".to_string(), Network::Testnet => "testnet".to_string(),
} }
}
} }
pub mod config; pub mod config;
pub mod database; pub mod database;
pub mod discovery;
pub mod pool; pub mod pool;
pub mod simple_handlers; pub mod proxy;
pub mod types;
use config::Config; use config::Config;
use database::Database; use database::Database;
use discovery::NodeDiscovery;
use pool::{NodePool, PoolStatus}; use pool::{NodePool, PoolStatus};
use simple_handlers::{simple_proxy_handler, simple_stats_handler}; use proxy::{proxy_handler, stats_handler};
#[derive(Clone)] #[derive(Clone)]
pub struct AppState { pub struct AppState {
pub node_pool: Arc<RwLock<NodePool>>, pub node_pool: Arc<NodePool>,
} }
/// Manages background tasks for the RPC pool /// Manages background tasks for the RPC pool
pub struct PoolHandle { pub struct PoolHandle {
pub status_update_handle: JoinHandle<()>, pub status_update_handle: JoinHandle<()>,
pub discovery_handle: JoinHandle<()>,
} }
impl Drop for PoolHandle { impl Drop for PoolHandle {
fn drop(&mut self) { fn drop(&mut self) {
self.status_update_handle.abort(); self.status_update_handle.abort();
self.discovery_handle.abort();
} }
} }
@ -65,64 +68,41 @@ async fn create_app_with_receiver(
PoolHandle, PoolHandle,
)> { )> {
// Initialize database // Initialize database
let db = Database::new_with_data_dir(config.data_dir.clone()).await?; let db = Database::new(config.data_dir.clone()).await?;
// Initialize node pool with network // Initialize node pool with network
let network_str = network_to_string(&network); let network_str = network.to_network_string();
let (node_pool, status_receiver) = NodePool::new(db.clone(), network_str.clone()); let (node_pool, status_receiver) = NodePool::new(db.clone(), network_str.clone());
let node_pool = Arc::new(RwLock::new(node_pool)); let node_pool = Arc::new(node_pool);
// Initialize discovery service
let discovery = NodeDiscovery::new(db.clone())?;
// Publish initial status immediately to ensure first event is sent // Publish initial status immediately to ensure first event is sent
{ if let Err(e) = node_pool.publish_status_update().await {
let pool_guard = node_pool.read().await;
if let Err(e) = pool_guard.publish_status_update().await {
error!("Failed to publish initial status update: {}", e); error!("Failed to publish initial status update: {}", e);
} }
}
// Start background tasks // Send status updates every 10 seconds
let mut interval = tokio::time::interval(std::time::Duration::from_secs(10));
let node_pool_for_health_check = node_pool.clone(); let node_pool_for_health_check = node_pool.clone();
let status_update_handle = tokio::spawn(async move { let status_update_handle = tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(10));
loop { loop {
interval.tick().await; if let Err(e) = node_pool_for_health_check.publish_status_update().await {
// Publish status update
let pool_guard = node_pool_for_health_check.read().await;
if let Err(e) = pool_guard.publish_status_update().await {
error!("Failed to publish status update: {}", e); error!("Failed to publish status update: {}", e);
} }
}
});
// Start periodic discovery task interval.tick().await;
let discovery_clone = discovery.clone();
let network_clone = network;
let discovery_handle = tokio::spawn(async move {
if let Err(e) = discovery_clone.periodic_discovery_task(network_clone).await {
error!(
"Periodic discovery task failed for network {}: {}",
network_to_string(&network_clone),
e
);
} }
}); });
let pool_handle = PoolHandle { let pool_handle = PoolHandle {
status_update_handle, status_update_handle,
discovery_handle,
}; };
let app_state = AppState { node_pool }; let app_state = AppState { node_pool };
// Build the app // Build the app
let app = Router::new() let app = Router::new()
.route("/stats", get(simple_stats_handler)) .route("/stats", get(stats_handler))
.route("/*path", any(simple_proxy_handler)) .route("/*path", any(proxy_handler))
.layer(CorsLayer::permissive()) .layer(CorsLayer::permissive())
.with_state(app_state); .with_state(app_state);

View file

@ -1,10 +1,7 @@
use clap::Parser; use clap::Parser;
use tracing::{info, warn};
use tracing_subscriber::{self, EnvFilter};
use monero_rpc_pool::database::Database;
use monero_rpc_pool::discovery::NodeDiscovery;
use monero_rpc_pool::{config::Config, run_server}; use monero_rpc_pool::{config::Config, run_server};
use tracing::info;
use tracing_subscriber::{self, EnvFilter};
use monero::Network; use monero::Network;
@ -20,6 +17,7 @@ fn parse_network(s: &str) -> Result<Network, String> {
} }
} }
// TODO: Replace with Display impl for Network
fn network_to_string(network: &Network) -> String { fn network_to_string(network: &Network) -> String {
match network { match network {
Network::Mainnet => "mainnet".to_string(), Network::Mainnet => "mainnet".to_string(),
@ -41,10 +39,6 @@ struct Args {
#[arg(help = "Port to bind the server to")] #[arg(help = "Port to bind the server to")]
port: u16, port: u16,
#[arg(long, value_delimiter = ',')]
#[arg(help = "Comma-separated list of Monero node URLs (overrides network-based discovery)")]
nodes: Option<Vec<String>>,
#[arg(short, long, default_value = "mainnet")] #[arg(short, long, default_value = "mainnet")]
#[arg(help = "Network to use for automatic node discovery")] #[arg(help = "Network to use for automatic node discovery")]
#[arg(value_parser = parse_network)] #[arg(value_parser = parse_network)]
@ -55,117 +49,28 @@ struct Args {
verbose: bool, verbose: bool,
} }
// Custom filter function that overrides log levels for our crate
fn create_level_override_filter(base_filter: &str) -> EnvFilter {
// Parse the base filter and modify it to treat all monero_rpc_pool logs as trace
let mut filter = EnvFilter::new(base_filter);
// Add a directive that treats all levels from our crate as trace
filter = filter.add_directive("monero_rpc_pool=trace".parse().unwrap());
filter
}
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse(); let args = Args::parse();
// Create a filter that treats all logs from our crate as traces
let base_filter = if args.verbose {
// In verbose mode, show logs from other crates at WARN level
"warn"
} else {
// In normal mode, show logs from other crates at ERROR level
"error"
};
let filter = create_level_override_filter(base_filter);
tracing_subscriber::fmt() tracing_subscriber::fmt()
.with_env_filter(filter) .with_env_filter(EnvFilter::new("trace"))
.with_target(false) .with_target(false)
.with_file(true) .with_file(true)
.with_line_number(true) .with_line_number(true)
.init(); .init();
// Store node count for later logging before potentially moving args.nodes
let manual_node_count = args.nodes.as_ref().map(|nodes| nodes.len());
// Determine nodes to use and set up discovery
let _nodes = if let Some(manual_nodes) = args.nodes {
info!(
"Using manually specified nodes for network: {}",
network_to_string(&args.network)
);
// Insert manual nodes into database with network information
let db = Database::new().await?;
let discovery = NodeDiscovery::new(db.clone())?;
let mut parsed_nodes = Vec::new();
for node_url in &manual_nodes {
// Parse the URL to extract components
if let Ok(url) = url::Url::parse(node_url) {
let scheme = url.scheme().to_string();
let _protocol = if scheme == "https" { "ssl" } else { "tcp" };
let host = url.host_str().unwrap_or("").to_string();
let port = url
.port()
.unwrap_or(if scheme == "https" { 443 } else { 80 })
as i64;
let full_url = format!("{}://{}:{}", scheme, host, port);
// Insert into database
if let Err(e) = db
.upsert_node(&scheme, &host, port, &network_to_string(&args.network))
.await
{
warn!("Failed to insert manual node {}: {}", node_url, e);
} else {
parsed_nodes.push(full_url);
}
} else {
warn!("Failed to parse manual node URL: {}", node_url);
}
}
// Use manual nodes for discovery
discovery
.discover_and_insert_nodes(args.network, manual_nodes)
.await?;
parsed_nodes
} else {
info!(
"Setting up automatic node discovery for {} network",
network_to_string(&args.network)
);
let db = Database::new().await?;
let discovery = NodeDiscovery::new(db.clone())?;
// Start discovery process
discovery.discover_nodes_from_sources(args.network).await?;
Vec::new() // Return empty vec for consistency
};
let config = Config::new_with_port( let config = Config::new_with_port(
args.host, args.host,
args.port, args.port,
std::env::temp_dir().join("monero-rpc-pool"), std::env::temp_dir().join("monero-rpc-pool"),
); );
let node_count_msg = if args.verbose {
match manual_node_count {
Some(count) => format!("{} manual nodes configured", count),
None => "using automatic discovery".to_string(),
}
} else {
"configured".to_string()
};
info!( info!(
"Starting Monero RPC Pool\nConfiguration:\n Host: {}\n Port: {}\n Network: {}\n Nodes: {}", host = config.host,
config.host, config.port, network_to_string(&args.network), node_count_msg port = config.port,
network = network_to_string(&args.network),
"Starting Monero RPC Pool"
); );
if let Err(e) = run_server(config, args.network).await { if let Err(e) = run_server(config, args.network).await {

View file

@ -1,10 +1,10 @@
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use rand::prelude::*;
use tokio::sync::broadcast; use tokio::sync::broadcast;
use tracing::debug; use tracing::{debug, warn};
use typeshare::typeshare; use typeshare::typeshare;
use crate::database::Database; use crate::database::Database;
use crate::types::NodeAddress;
#[derive(Debug, Clone, serde::Serialize)] #[derive(Debug, Clone, serde::Serialize)]
#[typeshare] #[typeshare]
@ -43,64 +43,6 @@ impl NodePool {
(pool, status_receiver) (pool, status_receiver)
} }
/// Get next node using Power of Two Choices algorithm
/// Only considers identified nodes (nodes with network set)
pub async fn get_next_node(&self) -> Result<Option<String>> {
let candidate_nodes = self.db.get_identified_nodes(&self.network).await?;
if candidate_nodes.is_empty() {
debug!("No identified nodes available for network {}", self.network);
return Ok(None);
}
if candidate_nodes.len() == 1 {
return Ok(Some(candidate_nodes[0].full_url()));
}
// Power of Two Choices: pick 2 random nodes, select the better one
let mut rng = thread_rng();
let node1 = candidate_nodes.choose(&mut rng).unwrap();
let node2 = candidate_nodes.choose(&mut rng).unwrap();
let selected =
if self.calculate_goodness_score(node1) >= self.calculate_goodness_score(node2) {
node1
} else {
node2
};
debug!(
"Selected node using P2C for network {}: {}",
self.network,
selected.full_url()
);
Ok(Some(selected.full_url()))
}
/// Calculate goodness score based on usage-based recency
/// Score is a function of success rate and latency from last N health checks
fn calculate_goodness_score(&self, node: &crate::database::MoneroNode) -> f64 {
let total_checks = node.success_count + node.failure_count;
if total_checks == 0 {
return 0.0;
}
let success_rate = node.success_count as f64 / total_checks as f64;
// Weight by recency (more recent interactions = higher weight)
let recency_weight = (total_checks as f64).min(200.0) / 200.0;
let mut score = success_rate * recency_weight;
// Factor in latency - lower latency = higher score
if let Some(avg_latency) = node.avg_latency_ms {
let latency_factor = 1.0 - (avg_latency.min(2000.0) / 2000.0);
score = score * 0.8 + latency_factor * 0.2; // 80% success rate, 20% latency
}
score
}
pub async fn record_success( pub async fn record_success(
&self, &self,
scheme: &str, scheme: &str,
@ -123,7 +65,13 @@ impl NodePool {
pub async fn publish_status_update(&self) -> Result<()> { pub async fn publish_status_update(&self) -> Result<()> {
let status = self.get_current_status().await?; let status = self.get_current_status().await?;
let _ = self.status_sender.send(status); // Ignore if no receivers
if let Err(e) = self.status_sender.send(status.clone()) {
warn!("Failed to send status update: {}", e);
} else {
debug!(?status, "Sent status update");
}
Ok(()) Ok(())
} }
@ -139,7 +87,7 @@ impl NodePool {
.map(|node| ReliableNodeInfo { .map(|node| ReliableNodeInfo {
url: node.full_url(), url: node.full_url(),
success_rate: node.success_rate(), success_rate: node.success_rate(),
avg_latency_ms: node.avg_latency_ms, avg_latency_ms: node.health.avg_latency_ms,
}) })
.collect(); .collect();
@ -152,81 +100,63 @@ impl NodePool {
}) })
} }
/// Get top reliable nodes with fill-up logic to ensure pool size /// Get nodes to use, with weighted selection favoring top performers
/// First tries to get top nodes based on recent success, then fills up with random nodes /// The list has some randomness, but the top nodes are still more likely to be chosen
pub async fn get_top_reliable_nodes( pub async fn get_top_reliable_nodes(&self, limit: usize) -> Result<Vec<NodeAddress>> {
&self, use rand::seq::SliceRandom;
limit: usize,
) -> Result<Vec<crate::database::MoneroNode>> {
debug!( debug!(
"Getting top reliable nodes for network {} (target: {})", "Getting top reliable nodes for network {} (target: {})",
self.network, limit self.network, limit
); );
// Step 1: Try primary fetch - get top nodes based on recent success (last 200 health checks) let available_nodes = self
let mut top_nodes = self
.db .db
.get_top_nodes_by_recent_success(&self.network, 200, limit as i64) .get_top_nodes_by_recent_success(&self.network, limit as i64)
.await .await
.context("Failed to get top nodes by recent success")?; .context("Failed to get top nodes by recent success")?;
let total_candidates = available_nodes.len();
let weighted: Vec<(NodeAddress, f64)> = available_nodes
.into_iter()
.enumerate()
.map(|(idx, node)| {
// Higher-ranked (smaller idx) ⇒ larger weight
let weight = 1.5_f64.powi((total_candidates - idx) as i32);
(node, weight)
})
.collect();
let mut rng = rand::thread_rng();
let mut candidates = weighted;
let mut selected_nodes = Vec::with_capacity(limit);
while selected_nodes.len() < limit && !candidates.is_empty() {
// Choose one node based on its weight using `choose_weighted`
let chosen_pair = candidates
.choose_weighted(&mut rng, |item| item.1)
.map_err(|e| anyhow::anyhow!("Weighted choice failed: {}", e))?;
// Locate index of the chosen pair and remove it
let chosen_index = candidates
.iter()
.position(|x| std::ptr::eq(x, chosen_pair))
.expect("Chosen item must exist in candidates");
let (node, _) = candidates.swap_remove(chosen_index);
selected_nodes.push(node);
}
debug!( debug!(
"Primary fetch returned {} nodes for network {} (target: {})", "Pool size: {} nodes for network {} (target: {})",
top_nodes.len(), selected_nodes.len(),
self.network, self.network,
limit limit
); );
// Step 2: If primary fetch didn't return enough nodes, fall back to any identified nodes with successful health checks Ok(selected_nodes)
if top_nodes.len() < limit {
debug!("Primary fetch returned insufficient nodes, falling back to any identified nodes with successful health checks");
top_nodes = self
.db
.get_identified_nodes_with_success(&self.network)
.await?;
debug!(
"Fallback fetch returned {} nodes with successful health checks for network {}",
top_nodes.len(),
self.network
);
}
// Step 3: Check if we still don't have enough nodes
if top_nodes.len() < limit {
let needed = limit - top_nodes.len();
debug!(
"Pool needs {} more nodes to reach target of {} for network {}",
needed, limit, self.network
);
// Step 4: Collect exclusion IDs from nodes already selected
let exclude_ids: Vec<i64> = top_nodes.iter().filter_map(|node| node.id).collect();
// Step 5: Secondary fetch - get random nodes to fill up
let random_fillers = self
.db
.get_random_nodes(&self.network, needed as i64, &exclude_ids)
.await?;
debug!(
"Secondary fetch returned {} random nodes for network {}",
random_fillers.len(),
self.network
);
// Step 6: Combine lists
top_nodes.extend(random_fillers);
}
debug!(
"Final pool size: {} nodes for network {} (target: {})",
top_nodes.len(),
self.network,
limit
);
Ok(top_nodes)
} }
pub async fn get_pool_stats(&self) -> Result<PoolStats> { pub async fn get_pool_stats(&self) -> Result<PoolStats> {
@ -238,11 +168,11 @@ impl NodePool {
} else { } else {
let total_latency: f64 = reliable_nodes let total_latency: f64 = reliable_nodes
.iter() .iter()
.filter_map(|node| node.avg_latency_ms) .filter_map(|node| node.health.avg_latency_ms)
.sum(); .sum();
let count = reliable_nodes let count = reliable_nodes
.iter() .iter()
.filter(|node| node.avg_latency_ms.is_some()) .filter(|node| node.health.avg_latency_ms.is_some())
.count(); .count();
if count > 0 { if count > 0 {

View file

@ -16,7 +16,7 @@ enum HandlerError {
NoNodes, NoNodes,
PoolError(String), PoolError(String),
RequestError(String), RequestError(String),
AllRequestsFailed(Vec<(String, String)>), // Vec of (node_url, error_message) AllRequestsFailed(Vec<(String, String)>),
} }
impl std::fmt::Display for HandlerError { impl std::fmt::Display for HandlerError {
@ -150,8 +150,8 @@ async fn raw_http_request(
} }
async fn record_success(state: &AppState, scheme: &str, host: &str, port: i64, latency_ms: f64) { async fn record_success(state: &AppState, scheme: &str, host: &str, port: i64, latency_ms: f64) {
let node_pool_guard = state.node_pool.read().await; if let Err(e) = state
if let Err(e) = node_pool_guard .node_pool
.record_success(scheme, host, port, latency_ms) .record_success(scheme, host, port, latency_ms)
.await .await
{ {
@ -163,8 +163,7 @@ async fn record_success(state: &AppState, scheme: &str, host: &str, port: i64, l
} }
async fn record_failure(state: &AppState, scheme: &str, host: &str, port: i64) { async fn record_failure(state: &AppState, scheme: &str, host: &str, port: i64) {
let node_pool_guard = state.node_pool.read().await; if let Err(e) = state.node_pool.record_failure(scheme, host, port).await {
if let Err(e) = node_pool_guard.record_failure(scheme, host, port).await {
error!( error!(
"Failed to record failure for {}://{}:{}: {}", "Failed to record failure for {}://{}:{}: {}",
scheme, host, port, e scheme, host, port, e
@ -173,15 +172,12 @@ async fn record_failure(state: &AppState, scheme: &str, host: &str, port: i64) {
} }
async fn single_raw_request( async fn single_raw_request(
state: &AppState,
node_url: (String, String, i64), node_url: (String, String, i64),
path: &str, path: &str,
method: &str, method: &str,
headers: &HeaderMap, headers: &HeaderMap,
body: Option<&[u8]>, body: Option<&[u8]>,
) -> Result<(Response, (String, String, i64), f64), HandlerError> { ) -> Result<(Response, (String, String, i64), f64), HandlerError> {
let (scheme, host, port) = &node_url;
let start_time = Instant::now(); let start_time = Instant::now();
match raw_http_request(node_url.clone(), path, method, headers, body).await { match raw_http_request(node_url.clone(), path, method, headers, body).await {
@ -199,42 +195,37 @@ async fn single_raw_request(
.map_err(|e| HandlerError::RequestError(format!("{:#?}", e)))?; .map_err(|e| HandlerError::RequestError(format!("{:#?}", e)))?;
if is_jsonrpc_error(&body_bytes) { if is_jsonrpc_error(&body_bytes) {
record_failure(state, scheme, host, *port).await;
return Err(HandlerError::RequestError("JSON-RPC error".to_string())); return Err(HandlerError::RequestError("JSON-RPC error".to_string()));
} }
// Reconstruct response with the body we consumed // Reconstruct response with the body we consumed
let response = Response::from_parts(parts, Body::from(body_bytes)); let response = Response::from_parts(parts, Body::from(body_bytes));
record_success(state, scheme, host, *port, latency_ms).await;
Ok((response, node_url, latency_ms)) Ok((response, node_url, latency_ms))
} else { } else {
// For non-JSON-RPC endpoints, HTTP success is enough // For non-JSON-RPC endpoints, HTTP success is enough
record_success(state, scheme, host, *port, latency_ms).await;
Ok((response, node_url, latency_ms)) Ok((response, node_url, latency_ms))
} }
} else { } else {
// Non-200 status codes are failures // Non-200 status codes are failures
record_failure(state, scheme, host, *port).await;
Err(HandlerError::RequestError(format!( Err(HandlerError::RequestError(format!(
"HTTP {}", "HTTP {}",
response.status() response.status()
))) )))
} }
} }
Err(e) => { Err(e) => Err(e),
record_failure(state, scheme, host, *port).await;
Err(e)
}
} }
} }
async fn race_requests( async fn sequential_requests(
state: &AppState, state: &AppState,
path: &str, path: &str,
method: &str, method: &str,
headers: &HeaderMap, headers: &HeaderMap,
body: Option<&[u8]>, body: Option<&[u8]>,
) -> Result<Response, HandlerError> { ) -> Result<Response, HandlerError> {
const POOL_SIZE: usize = 20;
// Extract JSON-RPC method for better logging // Extract JSON-RPC method for better logging
let jsonrpc_method = if path == "/json_rpc" { let jsonrpc_method = if path == "/json_rpc" {
if let Some(body_data) = body { if let Some(body_data) = body {
@ -245,22 +236,21 @@ async fn race_requests(
} else { } else {
None None
}; };
const POOL_SIZE: usize = 20;
let mut tried_nodes = std::collections::HashSet::new(); let mut tried_nodes = 0;
let mut pool_index = 0;
let mut collected_errors: Vec<(String, String)> = Vec::new(); let mut collected_errors: Vec<(String, String)> = Vec::new();
// Get the exclusive pool of 20 nodes once at the beginning // Get the pool of nodes
let available_pool = { let available_pool = {
let node_pool_guard = state.node_pool.read().await; let nodes = state
let reliable_nodes = node_pool_guard .node_pool
.get_top_reliable_nodes(POOL_SIZE) .get_top_reliable_nodes(POOL_SIZE)
.await .await
.map_err(|e| HandlerError::PoolError(e.to_string()))?; .map_err(|e| HandlerError::PoolError(e.to_string()))?;
let pool: Vec<(String, String, i64)> = reliable_nodes let pool: Vec<(String, String, i64)> = nodes
.into_iter() .into_iter()
.map(|node| (node.scheme, node.host, node.port)) .map(|node| (node.scheme, node.host, node.port as i64))
.collect(); .collect();
pool pool
@ -270,142 +260,59 @@ async fn race_requests(
return Err(HandlerError::NoNodes); return Err(HandlerError::NoNodes);
} }
// Power of Two Choices within the exclusive pool // Try nodes one by one sequentially
while pool_index < available_pool.len() && tried_nodes.len() < POOL_SIZE { for node in available_pool.iter().take(POOL_SIZE) {
let mut node1_option = None; tried_nodes += 1;
let mut node2_option = None; let node_display = format!("{}://{}:{}", node.0, node.1, node.2);
// Select first untried node from pool
for (i, node) in available_pool.iter().enumerate().skip(pool_index) {
if !tried_nodes.contains(node) {
node1_option = Some(node.clone());
pool_index = i + 1;
break;
}
}
// Select second untried node from pool (different from first)
for node in available_pool.iter().skip(pool_index) {
if !tried_nodes.contains(node) && Some(node) != node1_option.as_ref() {
node2_option = Some(node.clone());
break;
}
}
// If we can't get any new nodes from the pool, we've exhausted our options
if node1_option.is_none() && node2_option.is_none() {
break;
}
// Store node URLs for error tracking before consuming them
let current_nodes: Vec<(String, String, i64)> = [&node1_option, &node2_option]
.iter()
.filter_map(|opt| opt.as_ref())
.cloned()
.collect();
let mut requests = Vec::new();
if let Some(node1) = node1_option {
tried_nodes.insert(node1.clone());
requests.push(single_raw_request(
state,
node1.clone(),
path,
method,
headers,
body,
));
}
if let Some(node2) = node2_option {
tried_nodes.insert(node2.clone());
requests.push(single_raw_request(
state,
node2.clone(),
path,
method,
headers,
body,
));
}
if requests.is_empty() {
break;
}
match &jsonrpc_method { match &jsonrpc_method {
Some(rpc_method) => debug!( Some(rpc_method) => debug!(
"Racing {} requests to {} (JSON-RPC: {}): {} nodes (tried {} so far)", "Trying {} request to {} (JSON-RPC: {}) - attempt {} of {}",
method, method,
path, node_display,
rpc_method, rpc_method,
requests.len(), tried_nodes,
tried_nodes.len() available_pool.len().min(POOL_SIZE)
), ),
None => debug!( None => debug!(
"Racing {} requests to {}: {} nodes (tried {} so far)", "Trying {} request to {} - attempt {} of {}",
method, method,
path, node_display,
requests.len(), tried_nodes,
tried_nodes.len() available_pool.len().min(POOL_SIZE)
), ),
} }
// Handle the requests based on how many we have match single_raw_request(node.clone(), path, method, headers, body).await {
let result = match requests.len() {
1 => {
// Only one request
requests.into_iter().next().unwrap().await
}
2 => {
// Two requests - race them
let mut iter = requests.into_iter();
let req1 = iter.next().unwrap();
let req2 = iter.next().unwrap();
tokio::select! {
result1 = req1 => result1,
result2 = req2 => result2,
}
}
_ => unreachable!("We only add 1 or 2 requests"),
};
match result {
Ok((response, winning_node, latency_ms)) => { Ok((response, winning_node, latency_ms)) => {
let (scheme, host, port) = &winning_node; let (scheme, host, port) = &winning_node;
let winning_node = format!("{}://{}:{}", scheme, host, port); let winning_node_display = format!("{}://{}:{}", scheme, host, port);
match &jsonrpc_method { match &jsonrpc_method {
Some(rpc_method) => { Some(rpc_method) => debug!(
debug!(
"{} response from {} ({}ms) - SUCCESS after trying {} nodes! JSON-RPC: {}", "{} response from {} ({}ms) - SUCCESS after trying {} nodes! JSON-RPC: {}",
method, winning_node, latency_ms, tried_nodes.len(), rpc_method method, winning_node_display, latency_ms, tried_nodes, rpc_method
) ),
}
None => debug!( None => debug!(
"{} response from {} ({}ms) - SUCCESS after trying {} nodes!", "{} response from {} ({}ms) - SUCCESS after trying {} nodes!",
method, method, winning_node_display, latency_ms, tried_nodes
winning_node,
latency_ms,
tried_nodes.len()
), ),
} }
record_success(state, scheme, host, *port, latency_ms).await;
record_success(state, &node.0, &node.1, node.2, latency_ms).await;
return Ok(response); return Ok(response);
} }
Err(e) => { Err(e) => {
// Since we don't know which specific node failed in the race, collected_errors.push((node_display.clone(), e.to_string()));
// record the error for all nodes in this batch
for (scheme, host, port) in &current_nodes {
let node_display = format!("{}://{}:{}", scheme, host, port);
collected_errors.push((node_display, e.to_string()));
}
debug!( debug!(
"Request failed: {} - retrying with different nodes from pool...", "Request failed with node {} with error {} - trying next node...",
e node_display, e
); );
record_failure(state, &node.0, &node.1, node.2).await;
continue; continue;
} }
} }
@ -421,14 +328,14 @@ async fn race_requests(
Some(rpc_method) => error!( Some(rpc_method) => error!(
"All {} requests failed after trying {} nodes (JSON-RPC: {}). Detailed errors:\n{}", "All {} requests failed after trying {} nodes (JSON-RPC: {}). Detailed errors:\n{}",
method, method,
tried_nodes.len(), tried_nodes,
rpc_method, rpc_method,
detailed_errors.join("\n") detailed_errors.join("\n")
), ),
None => error!( None => error!(
"All {} requests failed after trying {} nodes. Detailed errors:\n{}", "All {} requests failed after trying {} nodes. Detailed errors:\n{}",
method, method,
tried_nodes.len(), tried_nodes,
detailed_errors.join("\n") detailed_errors.join("\n")
), ),
} }
@ -446,7 +353,7 @@ async fn proxy_request(
headers: &HeaderMap, headers: &HeaderMap,
body: Option<&[u8]>, body: Option<&[u8]>,
) -> Response { ) -> Response {
match race_requests(state, path, method, headers, body).await { match sequential_requests(state, path, method, headers, body).await {
Ok(res) => res, Ok(res) => res,
Err(handler_error) => { Err(handler_error) => {
let error_response = match &handler_error { let error_response = match &handler_error {
@ -505,7 +412,7 @@ async fn proxy_request(
} }
#[axum::debug_handler] #[axum::debug_handler]
pub async fn simple_proxy_handler( pub async fn proxy_handler(
State(state): State<AppState>, State(state): State<AppState>,
method: Method, method: Method,
uri: axum::http::Uri, uri: axum::http::Uri,
@ -553,11 +460,9 @@ pub async fn simple_proxy_handler(
} }
#[axum::debug_handler] #[axum::debug_handler]
pub async fn simple_stats_handler(State(state): State<AppState>) -> Response { pub async fn stats_handler(State(state): State<AppState>) -> Response {
async move { async move {
let node_pool_guard = state.node_pool.read().await; match state.node_pool.get_current_status().await {
match node_pool_guard.get_current_status().await {
Ok(status) => { Ok(status) => {
let stats_json = serde_json::json!({ let stats_json = serde_json::json!({
"status": "healthy", "status": "healthy",

View file

@ -0,0 +1,119 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::fmt;
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct NodeAddress {
pub scheme: String, // "http" or "https"
pub host: String,
pub port: u16,
}
impl NodeAddress {
pub fn new(scheme: String, host: String, port: u16) -> Self {
Self { scheme, host, port }
}
pub fn full_url(&self) -> String {
format!("{}://{}:{}", self.scheme, self.host, self.port)
}
}
impl fmt::Display for NodeAddress {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}://{}:{}", self.scheme, self.host, self.port)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeMetadata {
pub id: i64,
pub network: String, // "mainnet", "stagenet", or "testnet"
pub first_seen_at: DateTime<Utc>,
}
impl NodeMetadata {
pub fn new(id: i64, network: String, first_seen_at: DateTime<Utc>) -> Self {
Self {
id,
network,
first_seen_at,
}
}
}
/// Health check statistics for a node
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct NodeHealthStats {
pub success_count: i64,
pub failure_count: i64,
pub last_success: Option<DateTime<Utc>>,
pub last_failure: Option<DateTime<Utc>>,
pub last_checked: Option<DateTime<Utc>>,
pub avg_latency_ms: Option<f64>,
pub min_latency_ms: Option<f64>,
pub max_latency_ms: Option<f64>,
pub last_latency_ms: Option<f64>,
}
impl NodeHealthStats {
pub fn success_rate(&self) -> f64 {
let total = self.success_count + self.failure_count;
if total == 0 {
0.0
} else {
self.success_count as f64 / total as f64
}
}
pub fn reliability_score(&self) -> f64 {
let success_rate = self.success_rate();
let total_requests = self.success_count + self.failure_count;
// Weight success rate by total requests (more requests = more reliable data)
let request_weight = (total_requests as f64).min(200.0) / 200.0;
let mut score = success_rate * request_weight;
// Factor in latency - lower latency = higher score
if let Some(avg_latency) = self.avg_latency_ms {
// Normalize latency to 0-1 range (assuming 0-2000ms range)
let latency_factor = 1.0 - (avg_latency.min(2000.0) / 2000.0);
score = score * 0.8 + latency_factor * 0.2; // 80% success rate, 20% latency
}
score
}
}
/// A complete node record combining address, metadata, and health stats
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeRecord {
#[serde(flatten)]
pub address: NodeAddress,
#[serde(flatten)]
pub metadata: NodeMetadata,
#[serde(flatten)]
pub health: NodeHealthStats,
}
impl NodeRecord {
pub fn new(address: NodeAddress, metadata: NodeMetadata, health: NodeHealthStats) -> Self {
Self {
address,
metadata,
health,
}
}
pub fn full_url(&self) -> String {
self.address.full_url()
}
pub fn success_rate(&self) -> f64 {
self.health.success_rate()
}
pub fn reliability_score(&self) -> f64 {
self.health.reliability_score()
}
}

View file

@ -7,7 +7,6 @@ use crate::common::tracing_util::Format;
use crate::database::{open_db, AccessMode}; use crate::database::{open_db, AccessMode};
use crate::env::{Config as EnvConfig, GetConfig, Mainnet, Testnet}; use crate::env::{Config as EnvConfig, GetConfig, Mainnet, Testnet};
use crate::fs::system_data_dir; use crate::fs::system_data_dir;
use crate::monero::wallet_rpc;
use crate::monero::Wallets; use crate::monero::Wallets;
use crate::network::rendezvous::XmrBtcNamespace; use crate::network::rendezvous::XmrBtcNamespace;
use crate::protocol::Database; use crate::protocol::Database;
@ -376,11 +375,13 @@ impl ContextBuilder {
(), (),
); );
// Handle the different monero configurations // If we are instructed to use a pool, we start it and use it
// Otherwise we use the single node address provided by the user
let (monero_node_address, rpc_pool_handle) = match monero_config { let (monero_node_address, rpc_pool_handle) = match monero_config {
MoneroNodeConfig::Pool => { MoneroNodeConfig::Pool => {
// Start RPC pool and use it // Start RPC pool and use it
match monero_rpc_pool::start_server_with_random_port( let (server_info, mut status_receiver, pool_handle) =
monero_rpc_pool::start_server_with_random_port(
monero_rpc_pool::config::Config::new_random_port( monero_rpc_pool::config::Config::new_random_port(
"127.0.0.1".to_string(), "127.0.0.1".to_string(),
data_dir.join("monero-rpc-pool"), data_dir.join("monero-rpc-pool"),
@ -390,9 +391,8 @@ impl ContextBuilder {
false => crate::monero::Network::Mainnet, false => crate::monero::Network::Mainnet,
}, },
) )
.await .await?;
{
Ok((server_info, mut status_receiver, pool_handle)) => {
let rpc_url = let rpc_url =
format!("http://{}:{}", server_info.host, server_info.port); format!("http://{}:{}", server_info.host, server_info.port);
tracing::info!("Monero RPC Pool started on {}", rpc_url); tracing::info!("Monero RPC Pool started on {}", rpc_url);
@ -407,17 +407,9 @@ impl ContextBuilder {
}); });
} }
(Some(rpc_url), Some(Arc::new(pool_handle))) (rpc_url, Some(Arc::new(pool_handle)))
}
Err(e) => {
tracing::error!("Failed to start Monero RPC Pool: {}", e);
(None, None)
}
}
}
MoneroNodeConfig::SingleNode { url } => {
(if url.is_empty() { None } else { Some(url) }, None)
} }
MoneroNodeConfig::SingleNode { url } => (url, None),
}; };
let wallets = init_monero_wallet( let wallets = init_monero_wallet(
@ -583,27 +575,16 @@ async fn init_bitcoin_wallet(
async fn init_monero_wallet( async fn init_monero_wallet(
data_dir: &Path, data_dir: &Path,
monero_daemon_address: impl Into<Option<String>>, monero_daemon_address: String,
env_config: EnvConfig, env_config: EnvConfig,
tauri_handle: Option<TauriHandle>, tauri_handle: Option<TauriHandle>,
) -> Result<Arc<Wallets>> { ) -> Result<Arc<Wallets>> {
let network = env_config.monero_network; let network = env_config.monero_network;
// Use the ./monero/monero-data directory for backwards compatibility
let wallet_dir = data_dir.join("monero").join("monero-data"); let wallet_dir = data_dir.join("monero").join("monero-data");
let daemon = if let Some(addr) = monero_daemon_address.into() { let daemon = monero_sys::Daemon {
monero_sys::Daemon { address: monero_daemon_address,
address: addr,
ssl: false, ssl: false,
}
} else {
let node = wallet_rpc::choose_monero_node(env_config.monero_network).await?;
tracing::debug!(%node, "Automatically selected monero node");
monero_sys::Daemon {
address: node.to_string(),
ssl: false,
}
}; };
// This is the name of a wallet we only use for blockchain monitoring // This is the name of a wallet we only use for blockchain monitoring