From cc4069ebad2e803bdfca2c968e50bba1b13f546a Mon Sep 17 00:00:00 2001 From: Mohan <86064887+binarybaron@users.noreply.github.com> Date: Mon, 30 Jun 2025 10:53:01 +0200 Subject: [PATCH] refactor(monero-rpc-pool): Type safe SQL, simplify discovery (#443) * refactor(monero-rpc-pool): Type safe SQL, default nodes, no monero.fail * refactor * refactor * refactoring * fmt * add some randomness to node selection --- Cargo.lock | 1 - ...311800bb832efe37fe6c52181bd7bf631740c.json | 18 - ...cab4d64debeea9932c58047cc6244d136f80d.json | 110 --- ...95685fd5278103d13f56795ff5a51b0ef8036.json | 110 --- ...cb50dc694c89ef1eb08bf3d62e7d9e0902a4e.json | 110 --- ...98631cbd40637778dfa83e8f644ae6a7cf75b.json | 43 - ...178a08de20e442a07f3e734e61c410e4f338e.json | 18 - ...81de4441c8974b2d1304804874cf620420ad4.json | 28 + ...b2d9ac64d5b50fb7aa6028be3fcf266fc1d5d.json | 12 - ...89aaefc306e2a72b456fc2d03d4aa870e150b.json | 110 --- ...e88e56672e95e0e638af99e82df5b00595e77.json | 28 + ...d61fe0ca4b4f7a6ae5986025601b2000565d9.json | 12 + ...d86a847c4a25d4ee0eab57380d10b94d2686d.json | 110 --- ...8cfe3885ed226ae6ae02d6585c1f2f4140d68.json | 12 - ...7de159adbb7c7c0ef585ce4df9ec648bea7f8.json | 28 - monero-rpc-pool/Cargo.toml | 1 - ...8093515_add_default_nodes_from_feather.sql | 45 + monero-rpc-pool/src/database.rs | 835 ++---------------- monero-rpc-pool/src/discovery.rs | 399 --------- monero-rpc-pool/src/lib.rs | 74 +- monero-rpc-pool/src/main.rs | 111 +-- monero-rpc-pool/src/pool.rs | 180 ++-- .../src/{simple_handlers.rs => proxy.rs} | 193 +--- monero-rpc-pool/src/types.rs | 119 +++ swap/src/cli/api.rs | 83 +- 25 files changed, 467 insertions(+), 2323 deletions(-) delete mode 100644 monero-rpc-pool/.sqlx/query-03e5b2bccf8bffb962a56443448311800bb832efe37fe6c52181bd7bf631740c.json delete mode 100644 monero-rpc-pool/.sqlx/query-08d143b977a7fa23b289c22dee3cab4d64debeea9932c58047cc6244d136f80d.json delete mode 100644 monero-rpc-pool/.sqlx/query-0aa34e769813a40e0518f5311ff95685fd5278103d13f56795ff5a51b0ef8036.json delete mode 100644 monero-rpc-pool/.sqlx/query-2a378cb109fe284ba3a939aed1bcb50dc694c89ef1eb08bf3d62e7d9e0902a4e.json delete mode 100644 monero-rpc-pool/.sqlx/query-37157927724c8bc647bf4f76f5698631cbd40637778dfa83e8f644ae6a7cf75b.json delete mode 100644 monero-rpc-pool/.sqlx/query-3870c77c7c5fbb9bdd57c365765178a08de20e442a07f3e734e61c410e4f338e.json create mode 100644 monero-rpc-pool/.sqlx/query-44ddff5bdf5b56e9c1a9848641181de4441c8974b2d1304804874cf620420ad4.json delete mode 100644 monero-rpc-pool/.sqlx/query-56549d93f0e2106297b85565a52b2d9ac64d5b50fb7aa6028be3fcf266fc1d5d.json delete mode 100644 monero-rpc-pool/.sqlx/query-75ad770e6f70443871f919c26c189aaefc306e2a72b456fc2d03d4aa870e150b.json create mode 100644 monero-rpc-pool/.sqlx/query-7bc8d637e7cf020bff58d05109ae88e56672e95e0e638af99e82df5b00595e77.json create mode 100644 monero-rpc-pool/.sqlx/query-9047f0683f1cf956e9b367b4e85d61fe0ca4b4f7a6ae5986025601b2000565d9.json delete mode 100644 monero-rpc-pool/.sqlx/query-9f6d042ab61e1d3d652d85c7d77d86a847c4a25d4ee0eab57380d10b94d2686d.json delete mode 100644 monero-rpc-pool/.sqlx/query-b6d85d42bf72888afa22e27710e8cfe3885ed226ae6ae02d6585c1f2f4140d68.json delete mode 100644 monero-rpc-pool/.sqlx/query-ffa1b76d20c86d6bea02bd03e5e7de159adbb7c7c0ef585ce4df9ec648bea7f8.json create mode 100644 monero-rpc-pool/migrations/20250628093515_add_default_nodes_from_feather.sql delete mode 100644 monero-rpc-pool/src/discovery.rs rename monero-rpc-pool/src/{simple_handlers.rs => proxy.rs} (73%) create mode 100644 monero-rpc-pool/src/types.rs diff --git a/Cargo.lock b/Cargo.lock index f839a7ba..53f1e184 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5974,7 +5974,6 @@ dependencies = [ "axum", "chrono", "clap 4.5.40", - "dirs 5.0.1", "futures", "monero", "monero-rpc", diff --git a/monero-rpc-pool/.sqlx/query-03e5b2bccf8bffb962a56443448311800bb832efe37fe6c52181bd7bf631740c.json b/monero-rpc-pool/.sqlx/query-03e5b2bccf8bffb962a56443448311800bb832efe37fe6c52181bd7bf631740c.json deleted file mode 100644 index 089657a5..00000000 --- a/monero-rpc-pool/.sqlx/query-03e5b2bccf8bffb962a56443448311800bb832efe37fe6c52181bd7bf631740c.json +++ /dev/null @@ -1,18 +0,0 @@ -{ - "db_name": "SQLite", - "query": "SELECT id FROM monero_nodes WHERE scheme = ? AND host = ? AND port = ?", - "describe": { - "columns": [ - { - "name": "id", - "ordinal": 0, - "type_info": "Integer" - } - ], - "parameters": { - "Right": 3 - }, - "nullable": [true] - }, - "hash": "03e5b2bccf8bffb962a56443448311800bb832efe37fe6c52181bd7bf631740c" -} diff --git a/monero-rpc-pool/.sqlx/query-08d143b977a7fa23b289c22dee3cab4d64debeea9932c58047cc6244d136f80d.json b/monero-rpc-pool/.sqlx/query-08d143b977a7fa23b289c22dee3cab4d64debeea9932c58047cc6244d136f80d.json deleted file mode 100644 index fe50e71a..00000000 --- a/monero-rpc-pool/.sqlx/query-08d143b977a7fa23b289c22dee3cab4d64debeea9932c58047cc6244d136f80d.json +++ /dev/null @@ -1,110 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n SELECT \n n.id as \"id!: i64\",\n n.scheme,\n n.host,\n n.port,\n n.network,\n n.first_seen_at,\n CAST(COALESCE(stats.success_count, 0) AS INTEGER) as \"success_count!: i64\",\n CAST(COALESCE(stats.failure_count, 0) AS INTEGER) as \"failure_count!: i64\",\n stats.last_success as \"last_success?: String\",\n stats.last_failure as \"last_failure?: String\",\n stats.last_checked as \"last_checked?: String\",\n CAST(CASE WHEN reliable_nodes.node_id IS NOT NULL THEN 1 ELSE 0 END AS INTEGER) as \"is_reliable!: i64\",\n stats.avg_latency_ms as \"avg_latency_ms?: f64\",\n stats.min_latency_ms as \"min_latency_ms?: f64\",\n stats.max_latency_ms as \"max_latency_ms?: f64\",\n stats.last_latency_ms as \"last_latency_ms?: f64\"\n FROM monero_nodes n\n LEFT JOIN (\n SELECT \n node_id,\n SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,\n SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count,\n MAX(CASE WHEN was_successful THEN timestamp END) as last_success,\n MAX(CASE WHEN NOT was_successful THEN timestamp END) as last_failure,\n MAX(timestamp) as last_checked,\n AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms,\n MIN(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as min_latency_ms,\n MAX(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as max_latency_ms,\n (SELECT latency_ms FROM health_checks hc2 WHERE hc2.node_id = health_checks.node_id ORDER BY timestamp DESC LIMIT 1) as last_latency_ms\n FROM health_checks \n GROUP BY node_id\n ) stats ON n.id = stats.node_id\n LEFT JOIN (\n SELECT DISTINCT node_id FROM (\n SELECT \n n2.id as node_id,\n COALESCE(s2.success_count, 0) as success_count,\n COALESCE(s2.failure_count, 0) as failure_count,\n s2.avg_latency_ms,\n (CAST(COALESCE(s2.success_count, 0) AS REAL) / CAST(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0) AS REAL)) * \n (MIN(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0), 200) / 200.0) * 0.8 +\n CASE \n WHEN s2.avg_latency_ms IS NOT NULL THEN (1.0 - (MIN(s2.avg_latency_ms, 2000) / 2000.0)) * 0.2\n ELSE 0.0 \n END as reliability_score\n FROM monero_nodes n2\n LEFT JOIN (\n SELECT \n node_id,\n SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,\n SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count,\n AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms\n FROM health_checks \n GROUP BY node_id\n ) s2 ON n2.id = s2.node_id\n WHERE n2.network = ? AND (COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0)) > 0\n ORDER BY reliability_score DESC\n LIMIT 4\n )\n ) reliable_nodes ON n.id = reliable_nodes.node_id\n WHERE n.network = ? AND (COALESCE(stats.success_count, 0) + COALESCE(stats.failure_count, 0)) > 0\n ORDER BY \n (CAST(COALESCE(stats.success_count, 0) AS REAL) / CAST(COALESCE(stats.success_count, 0) + COALESCE(stats.failure_count, 0) AS REAL)) DESC,\n stats.avg_latency_ms ASC\n LIMIT ?\n ", - "describe": { - "columns": [ - { - "name": "id!: i64", - "ordinal": 0, - "type_info": "Integer" - }, - { - "name": "scheme", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "host", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "port", - "ordinal": 3, - "type_info": "Integer" - }, - { - "name": "network", - "ordinal": 4, - "type_info": "Text" - }, - { - "name": "first_seen_at", - "ordinal": 5, - "type_info": "Text" - }, - { - "name": "success_count!: i64", - "ordinal": 6, - "type_info": "Null" - }, - { - "name": "failure_count!: i64", - "ordinal": 7, - "type_info": "Null" - }, - { - "name": "last_success?: String", - "ordinal": 8, - "type_info": "Null" - }, - { - "name": "last_failure?: String", - "ordinal": 9, - "type_info": "Null" - }, - { - "name": "last_checked?: String", - "ordinal": 10, - "type_info": "Null" - }, - { - "name": "is_reliable!: i64", - "ordinal": 11, - "type_info": "Null" - }, - { - "name": "avg_latency_ms?: f64", - "ordinal": 12, - "type_info": "Null" - }, - { - "name": "min_latency_ms?: f64", - "ordinal": 13, - "type_info": "Null" - }, - { - "name": "max_latency_ms?: f64", - "ordinal": 14, - "type_info": "Null" - }, - { - "name": "last_latency_ms?: f64", - "ordinal": 15, - "type_info": "Float" - } - ], - "parameters": { - "Right": 3 - }, - "nullable": [ - true, - false, - false, - false, - false, - false, - null, - null, - null, - null, - null, - null, - null, - null, - null, - true - ] - }, - "hash": "08d143b977a7fa23b289c22dee3cab4d64debeea9932c58047cc6244d136f80d" -} diff --git a/monero-rpc-pool/.sqlx/query-0aa34e769813a40e0518f5311ff95685fd5278103d13f56795ff5a51b0ef8036.json b/monero-rpc-pool/.sqlx/query-0aa34e769813a40e0518f5311ff95685fd5278103d13f56795ff5a51b0ef8036.json deleted file mode 100644 index 26659748..00000000 --- a/monero-rpc-pool/.sqlx/query-0aa34e769813a40e0518f5311ff95685fd5278103d13f56795ff5a51b0ef8036.json +++ /dev/null @@ -1,110 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n SELECT \n n.id as \"id!: i64\",\n n.scheme,\n n.host,\n n.port,\n n.network,\n n.first_seen_at,\n CAST(COALESCE(stats.success_count, 0) AS INTEGER) as \"success_count!: i64\",\n CAST(COALESCE(stats.failure_count, 0) AS INTEGER) as \"failure_count!: i64\",\n stats.last_success as \"last_success?: String\",\n stats.last_failure as \"last_failure?: String\",\n stats.last_checked as \"last_checked?: String\",\n CAST(CASE WHEN reliable_nodes.node_id IS NOT NULL THEN 1 ELSE 0 END AS INTEGER) as \"is_reliable!: i64\",\n stats.avg_latency_ms as \"avg_latency_ms?: f64\",\n stats.min_latency_ms as \"min_latency_ms?: f64\",\n stats.max_latency_ms as \"max_latency_ms?: f64\",\n stats.last_latency_ms as \"last_latency_ms?: f64\"\n FROM monero_nodes n\n LEFT JOIN (\n SELECT \n node_id,\n SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,\n SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count,\n MAX(CASE WHEN was_successful THEN timestamp END) as last_success,\n MAX(CASE WHEN NOT was_successful THEN timestamp END) as last_failure,\n MAX(timestamp) as last_checked,\n AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms,\n MIN(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as min_latency_ms,\n MAX(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as max_latency_ms,\n (SELECT latency_ms FROM health_checks hc2 WHERE hc2.node_id = health_checks.node_id ORDER BY timestamp DESC LIMIT 1) as last_latency_ms\n FROM health_checks \n GROUP BY node_id\n ) stats ON n.id = stats.node_id\n LEFT JOIN (\n SELECT DISTINCT node_id FROM (\n SELECT \n n2.id as node_id,\n COALESCE(s2.success_count, 0) as success_count,\n COALESCE(s2.failure_count, 0) as failure_count,\n s2.avg_latency_ms,\n (CAST(COALESCE(s2.success_count, 0) AS REAL) / CAST(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0) AS REAL)) * \n (MIN(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0), 200) / 200.0) * 0.8 +\n CASE \n WHEN s2.avg_latency_ms IS NOT NULL THEN (1.0 - (MIN(s2.avg_latency_ms, 2000) / 2000.0)) * 0.2\n ELSE 0.0 \n END as reliability_score\n FROM monero_nodes n2\n LEFT JOIN (\n SELECT \n node_id,\n SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,\n SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count,\n AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms\n FROM health_checks \n GROUP BY node_id\n ) s2 ON n2.id = s2.node_id\n WHERE n2.network = ? AND (COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0)) > 0\n ORDER BY reliability_score DESC\n LIMIT 4\n )\n ) reliable_nodes ON n.id = reliable_nodes.node_id\n WHERE n.network = ?\n ORDER BY stats.avg_latency_ms ASC, stats.success_count DESC\n ", - "describe": { - "columns": [ - { - "name": "id!: i64", - "ordinal": 0, - "type_info": "Integer" - }, - { - "name": "scheme", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "host", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "port", - "ordinal": 3, - "type_info": "Integer" - }, - { - "name": "network", - "ordinal": 4, - "type_info": "Text" - }, - { - "name": "first_seen_at", - "ordinal": 5, - "type_info": "Text" - }, - { - "name": "success_count!: i64", - "ordinal": 6, - "type_info": "Null" - }, - { - "name": "failure_count!: i64", - "ordinal": 7, - "type_info": "Null" - }, - { - "name": "last_success?: String", - "ordinal": 8, - "type_info": "Null" - }, - { - "name": "last_failure?: String", - "ordinal": 9, - "type_info": "Null" - }, - { - "name": "last_checked?: String", - "ordinal": 10, - "type_info": "Null" - }, - { - "name": "is_reliable!: i64", - "ordinal": 11, - "type_info": "Null" - }, - { - "name": "avg_latency_ms?: f64", - "ordinal": 12, - "type_info": "Null" - }, - { - "name": "min_latency_ms?: f64", - "ordinal": 13, - "type_info": "Null" - }, - { - "name": "max_latency_ms?: f64", - "ordinal": 14, - "type_info": "Null" - }, - { - "name": "last_latency_ms?: f64", - "ordinal": 15, - "type_info": "Float" - } - ], - "parameters": { - "Right": 2 - }, - "nullable": [ - true, - false, - false, - false, - false, - false, - null, - null, - null, - null, - null, - null, - null, - null, - null, - true - ] - }, - "hash": "0aa34e769813a40e0518f5311ff95685fd5278103d13f56795ff5a51b0ef8036" -} diff --git a/monero-rpc-pool/.sqlx/query-2a378cb109fe284ba3a939aed1bcb50dc694c89ef1eb08bf3d62e7d9e0902a4e.json b/monero-rpc-pool/.sqlx/query-2a378cb109fe284ba3a939aed1bcb50dc694c89ef1eb08bf3d62e7d9e0902a4e.json deleted file mode 100644 index 80c4546b..00000000 --- a/monero-rpc-pool/.sqlx/query-2a378cb109fe284ba3a939aed1bcb50dc694c89ef1eb08bf3d62e7d9e0902a4e.json +++ /dev/null @@ -1,110 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n SELECT \n n.id as \"id!: i64\",\n n.scheme,\n n.host,\n n.port,\n n.network,\n n.first_seen_at,\n CAST(COALESCE(stats.success_count, 0) AS INTEGER) as \"success_count!: i64\",\n CAST(COALESCE(stats.failure_count, 0) AS INTEGER) as \"failure_count!: i64\",\n stats.last_success as \"last_success?: String\",\n stats.last_failure as \"last_failure?: String\",\n stats.last_checked as \"last_checked?: String\",\n CAST(CASE WHEN reliable_nodes.node_id IS NOT NULL THEN 1 ELSE 0 END AS INTEGER) as \"is_reliable!: i64\",\n stats.avg_latency_ms as \"avg_latency_ms?: f64\",\n stats.min_latency_ms as \"min_latency_ms?: f64\",\n stats.max_latency_ms as \"max_latency_ms?: f64\",\n stats.last_latency_ms as \"last_latency_ms?: f64\"\n FROM monero_nodes n\n LEFT JOIN (\n SELECT \n node_id,\n SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,\n SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count,\n MAX(CASE WHEN was_successful THEN timestamp END) as last_success,\n MAX(CASE WHEN NOT was_successful THEN timestamp END) as last_failure,\n MAX(timestamp) as last_checked,\n AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms,\n MIN(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as min_latency_ms,\n MAX(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as max_latency_ms,\n (SELECT latency_ms FROM health_checks hc2 WHERE hc2.node_id = health_checks.node_id ORDER BY timestamp DESC LIMIT 1) as last_latency_ms\n FROM health_checks \n GROUP BY node_id\n ) stats ON n.id = stats.node_id\n LEFT JOIN (\n SELECT DISTINCT node_id FROM (\n SELECT \n n2.id as node_id,\n COALESCE(s2.success_count, 0) as success_count,\n COALESCE(s2.failure_count, 0) as failure_count,\n s2.avg_latency_ms,\n (CAST(COALESCE(s2.success_count, 0) AS REAL) / CAST(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0) AS REAL)) * \n (MIN(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0), 200) / 200.0) * 0.8 +\n CASE \n WHEN s2.avg_latency_ms IS NOT NULL THEN (1.0 - (MIN(s2.avg_latency_ms, 2000) / 2000.0)) * 0.2\n ELSE 0.0 \n END as reliability_score\n FROM monero_nodes n2\n LEFT JOIN (\n SELECT \n node_id,\n SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,\n SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count,\n AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms\n FROM health_checks \n GROUP BY node_id\n ) s2 ON n2.id = s2.node_id\n WHERE n2.network = ? AND (COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0)) > 0\n ORDER BY reliability_score DESC\n LIMIT 4\n )\n ) reliable_nodes ON n.id = reliable_nodes.node_id\n WHERE n.network = ?\n ORDER BY RANDOM()\n LIMIT ?\n ", - "describe": { - "columns": [ - { - "name": "id!: i64", - "ordinal": 0, - "type_info": "Integer" - }, - { - "name": "scheme", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "host", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "port", - "ordinal": 3, - "type_info": "Integer" - }, - { - "name": "network", - "ordinal": 4, - "type_info": "Text" - }, - { - "name": "first_seen_at", - "ordinal": 5, - "type_info": "Text" - }, - { - "name": "success_count!: i64", - "ordinal": 6, - "type_info": "Null" - }, - { - "name": "failure_count!: i64", - "ordinal": 7, - "type_info": "Null" - }, - { - "name": "last_success?: String", - "ordinal": 8, - "type_info": "Null" - }, - { - "name": "last_failure?: String", - "ordinal": 9, - "type_info": "Null" - }, - { - "name": "last_checked?: String", - "ordinal": 10, - "type_info": "Null" - }, - { - "name": "is_reliable!: i64", - "ordinal": 11, - "type_info": "Null" - }, - { - "name": "avg_latency_ms?: f64", - "ordinal": 12, - "type_info": "Null" - }, - { - "name": "min_latency_ms?: f64", - "ordinal": 13, - "type_info": "Null" - }, - { - "name": "max_latency_ms?: f64", - "ordinal": 14, - "type_info": "Null" - }, - { - "name": "last_latency_ms?: f64", - "ordinal": 15, - "type_info": "Float" - } - ], - "parameters": { - "Right": 3 - }, - "nullable": [ - true, - false, - false, - false, - false, - false, - null, - null, - null, - null, - null, - null, - null, - null, - null, - true - ] - }, - "hash": "2a378cb109fe284ba3a939aed1bcb50dc694c89ef1eb08bf3d62e7d9e0902a4e" -} diff --git a/monero-rpc-pool/.sqlx/query-37157927724c8bc647bf4f76f5698631cbd40637778dfa83e8f644ae6a7cf75b.json b/monero-rpc-pool/.sqlx/query-37157927724c8bc647bf4f76f5698631cbd40637778dfa83e8f644ae6a7cf75b.json deleted file mode 100644 index 6544a53b..00000000 --- a/monero-rpc-pool/.sqlx/query-37157927724c8bc647bf4f76f5698631cbd40637778dfa83e8f644ae6a7cf75b.json +++ /dev/null @@ -1,43 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n SELECT \n id as \"id!: i64\",\n scheme,\n host,\n port,\n network as \"network!: String\",\n first_seen_at\n FROM monero_nodes \n ORDER BY id\n ", - "describe": { - "columns": [ - { - "name": "id!: i64", - "ordinal": 0, - "type_info": "Integer" - }, - { - "name": "scheme", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "host", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "port", - "ordinal": 3, - "type_info": "Integer" - }, - { - "name": "network!: String", - "ordinal": 4, - "type_info": "Text" - }, - { - "name": "first_seen_at", - "ordinal": 5, - "type_info": "Text" - } - ], - "parameters": { - "Right": 0 - }, - "nullable": [false, false, false, false, false, false] - }, - "hash": "37157927724c8bc647bf4f76f5698631cbd40637778dfa83e8f644ae6a7cf75b" -} diff --git a/monero-rpc-pool/.sqlx/query-3870c77c7c5fbb9bdd57c365765178a08de20e442a07f3e734e61c410e4f338e.json b/monero-rpc-pool/.sqlx/query-3870c77c7c5fbb9bdd57c365765178a08de20e442a07f3e734e61c410e4f338e.json deleted file mode 100644 index 53b7378d..00000000 --- a/monero-rpc-pool/.sqlx/query-3870c77c7c5fbb9bdd57c365765178a08de20e442a07f3e734e61c410e4f338e.json +++ /dev/null @@ -1,18 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n INSERT INTO monero_nodes (scheme, host, port, network, first_seen_at, updated_at)\n VALUES (?, ?, ?, ?, ?, ?)\n ON CONFLICT(scheme, host, port) DO UPDATE SET\n network = excluded.network,\n updated_at = excluded.updated_at\n RETURNING id\n ", - "describe": { - "columns": [ - { - "name": "id", - "ordinal": 0, - "type_info": "Integer" - } - ], - "parameters": { - "Right": 6 - }, - "nullable": [false] - }, - "hash": "3870c77c7c5fbb9bdd57c365765178a08de20e442a07f3e734e61c410e4f338e" -} diff --git a/monero-rpc-pool/.sqlx/query-44ddff5bdf5b56e9c1a9848641181de4441c8974b2d1304804874cf620420ad4.json b/monero-rpc-pool/.sqlx/query-44ddff5bdf5b56e9c1a9848641181de4441c8974b2d1304804874cf620420ad4.json new file mode 100644 index 00000000..42863654 --- /dev/null +++ b/monero-rpc-pool/.sqlx/query-44ddff5bdf5b56e9c1a9848641181de4441c8974b2d1304804874cf620420ad4.json @@ -0,0 +1,28 @@ +{ + "db_name": "SQLite", + "query": "\n SELECT \n n.scheme,\n n.host,\n n.port\n FROM monero_nodes n\n LEFT JOIN (\n SELECT \n node_id,\n SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,\n SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count\n FROM (\n SELECT node_id, was_successful\n FROM health_checks \n ORDER BY timestamp DESC \n LIMIT 1000\n ) recent_checks\n GROUP BY node_id\n ) stats ON n.id = stats.node_id\n WHERE n.network = ?\n ORDER BY \n CASE \n WHEN (COALESCE(stats.success_count, 0) + COALESCE(stats.failure_count, 0)) > 0 \n THEN CAST(COALESCE(stats.success_count, 0) AS REAL) / CAST(COALESCE(stats.success_count, 0) + COALESCE(stats.failure_count, 0) AS REAL)\n ELSE 0.0 \n END DESC\n LIMIT ?\n ", + "describe": { + "columns": [ + { + "name": "scheme", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "host", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "port", + "ordinal": 2, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 2 + }, + "nullable": [false, false, false] + }, + "hash": "44ddff5bdf5b56e9c1a9848641181de4441c8974b2d1304804874cf620420ad4" +} diff --git a/monero-rpc-pool/.sqlx/query-56549d93f0e2106297b85565a52b2d9ac64d5b50fb7aa6028be3fcf266fc1d5d.json b/monero-rpc-pool/.sqlx/query-56549d93f0e2106297b85565a52b2d9ac64d5b50fb7aa6028be3fcf266fc1d5d.json deleted file mode 100644 index ab6db76f..00000000 --- a/monero-rpc-pool/.sqlx/query-56549d93f0e2106297b85565a52b2d9ac64d5b50fb7aa6028be3fcf266fc1d5d.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n INSERT INTO health_checks (node_id, timestamp, was_successful, latency_ms)\n VALUES (?, ?, ?, ?)\n ", - "describe": { - "columns": [], - "parameters": { - "Right": 4 - }, - "nullable": [] - }, - "hash": "56549d93f0e2106297b85565a52b2d9ac64d5b50fb7aa6028be3fcf266fc1d5d" -} diff --git a/monero-rpc-pool/.sqlx/query-75ad770e6f70443871f919c26c189aaefc306e2a72b456fc2d03d4aa870e150b.json b/monero-rpc-pool/.sqlx/query-75ad770e6f70443871f919c26c189aaefc306e2a72b456fc2d03d4aa870e150b.json deleted file mode 100644 index b6e4a1e2..00000000 --- a/monero-rpc-pool/.sqlx/query-75ad770e6f70443871f919c26c189aaefc306e2a72b456fc2d03d4aa870e150b.json +++ /dev/null @@ -1,110 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n SELECT \n n.id as \"id!: i64\",\n n.scheme,\n n.host,\n n.port,\n n.network,\n n.first_seen_at,\n CAST(COALESCE(stats.success_count, 0) AS INTEGER) as \"success_count!: i64\",\n CAST(COALESCE(stats.failure_count, 0) AS INTEGER) as \"failure_count!: i64\",\n stats.last_success as \"last_success?: String\",\n stats.last_failure as \"last_failure?: String\",\n stats.last_checked as \"last_checked?: String\",\n CAST(CASE WHEN reliable_nodes.node_id IS NOT NULL THEN 1 ELSE 0 END AS INTEGER) as \"is_reliable!: i64\",\n stats.avg_latency_ms as \"avg_latency_ms?: f64\",\n stats.min_latency_ms as \"min_latency_ms?: f64\",\n stats.max_latency_ms as \"max_latency_ms?: f64\",\n stats.last_latency_ms as \"last_latency_ms?: f64\"\n FROM monero_nodes n\n LEFT JOIN (\n SELECT \n node_id,\n SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,\n SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count,\n MAX(CASE WHEN was_successful THEN timestamp END) as last_success,\n MAX(CASE WHEN NOT was_successful THEN timestamp END) as last_failure,\n MAX(timestamp) as last_checked,\n AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms,\n MIN(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as min_latency_ms,\n MAX(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as max_latency_ms,\n (SELECT latency_ms FROM health_checks hc2 WHERE hc2.node_id = health_checks.node_id ORDER BY timestamp DESC LIMIT 1) as last_latency_ms\n FROM health_checks \n GROUP BY node_id\n ) stats ON n.id = stats.node_id\n LEFT JOIN (\n SELECT DISTINCT node_id FROM (\n SELECT \n n2.id as node_id,\n COALESCE(s2.success_count, 0) as success_count,\n COALESCE(s2.failure_count, 0) as failure_count,\n s2.avg_latency_ms,\n (CAST(COALESCE(s2.success_count, 0) AS REAL) / CAST(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0) AS REAL)) * \n (MIN(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0), 200) / 200.0) * 0.8 +\n CASE \n WHEN s2.avg_latency_ms IS NOT NULL THEN (1.0 - (MIN(s2.avg_latency_ms, 2000) / 2000.0)) * 0.2\n ELSE 0.0 \n END as reliability_score\n FROM monero_nodes n2\n LEFT JOIN (\n SELECT \n node_id,\n SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,\n SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count,\n AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms\n FROM health_checks \n GROUP BY node_id\n ) s2 ON n2.id = s2.node_id\n WHERE n2.network = ? AND (COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0)) > 0\n ORDER BY reliability_score DESC\n LIMIT 4\n )\n ) reliable_nodes ON n.id = reliable_nodes.node_id\n WHERE n.network = ? AND stats.success_count > 0\n ORDER BY stats.avg_latency_ms ASC, stats.success_count DESC\n ", - "describe": { - "columns": [ - { - "name": "id!: i64", - "ordinal": 0, - "type_info": "Integer" - }, - { - "name": "scheme", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "host", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "port", - "ordinal": 3, - "type_info": "Integer" - }, - { - "name": "network", - "ordinal": 4, - "type_info": "Text" - }, - { - "name": "first_seen_at", - "ordinal": 5, - "type_info": "Text" - }, - { - "name": "success_count!: i64", - "ordinal": 6, - "type_info": "Null" - }, - { - "name": "failure_count!: i64", - "ordinal": 7, - "type_info": "Null" - }, - { - "name": "last_success?: String", - "ordinal": 8, - "type_info": "Null" - }, - { - "name": "last_failure?: String", - "ordinal": 9, - "type_info": "Null" - }, - { - "name": "last_checked?: String", - "ordinal": 10, - "type_info": "Null" - }, - { - "name": "is_reliable!: i64", - "ordinal": 11, - "type_info": "Null" - }, - { - "name": "avg_latency_ms?: f64", - "ordinal": 12, - "type_info": "Null" - }, - { - "name": "min_latency_ms?: f64", - "ordinal": 13, - "type_info": "Null" - }, - { - "name": "max_latency_ms?: f64", - "ordinal": 14, - "type_info": "Null" - }, - { - "name": "last_latency_ms?: f64", - "ordinal": 15, - "type_info": "Float" - } - ], - "parameters": { - "Right": 2 - }, - "nullable": [ - true, - false, - false, - false, - false, - false, - null, - null, - null, - null, - null, - null, - null, - null, - null, - true - ] - }, - "hash": "75ad770e6f70443871f919c26c189aaefc306e2a72b456fc2d03d4aa870e150b" -} diff --git a/monero-rpc-pool/.sqlx/query-7bc8d637e7cf020bff58d05109ae88e56672e95e0e638af99e82df5b00595e77.json b/monero-rpc-pool/.sqlx/query-7bc8d637e7cf020bff58d05109ae88e56672e95e0e638af99e82df5b00595e77.json new file mode 100644 index 00000000..7e94ecbb --- /dev/null +++ b/monero-rpc-pool/.sqlx/query-7bc8d637e7cf020bff58d05109ae88e56672e95e0e638af99e82df5b00595e77.json @@ -0,0 +1,28 @@ +{ + "db_name": "SQLite", + "query": "\n SELECT \n COUNT(*) as total,\n CAST(SUM(CASE WHEN stats.success_count > 0 THEN 1 ELSE 0 END) AS INTEGER) as \"reachable!: i64\",\n CAST(SUM(CASE WHEN stats.success_count > stats.failure_count AND stats.success_count > 0 THEN 1 ELSE 0 END) AS INTEGER) as \"reliable!: i64\"\n FROM monero_nodes n\n LEFT JOIN (\n SELECT \n node_id,\n SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,\n SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count\n FROM health_checks \n GROUP BY node_id\n ) stats ON n.id = stats.node_id\n WHERE n.network = ?\n ", + "describe": { + "columns": [ + { + "name": "total", + "ordinal": 0, + "type_info": "Integer" + }, + { + "name": "reachable!: i64", + "ordinal": 1, + "type_info": "Integer" + }, + { + "name": "reliable!: i64", + "ordinal": 2, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [false, true, true] + }, + "hash": "7bc8d637e7cf020bff58d05109ae88e56672e95e0e638af99e82df5b00595e77" +} diff --git a/monero-rpc-pool/.sqlx/query-9047f0683f1cf956e9b367b4e85d61fe0ca4b4f7a6ae5986025601b2000565d9.json b/monero-rpc-pool/.sqlx/query-9047f0683f1cf956e9b367b4e85d61fe0ca4b4f7a6ae5986025601b2000565d9.json new file mode 100644 index 00000000..30ad6c03 --- /dev/null +++ b/monero-rpc-pool/.sqlx/query-9047f0683f1cf956e9b367b4e85d61fe0ca4b4f7a6ae5986025601b2000565d9.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "\n INSERT INTO health_checks (node_id, timestamp, was_successful, latency_ms)\n SELECT id, datetime('now'), ?, ?\n FROM monero_nodes \n WHERE scheme = ? AND host = ? AND port = ?\n ", + "describe": { + "columns": [], + "parameters": { + "Right": 5 + }, + "nullable": [] + }, + "hash": "9047f0683f1cf956e9b367b4e85d61fe0ca4b4f7a6ae5986025601b2000565d9" +} diff --git a/monero-rpc-pool/.sqlx/query-9f6d042ab61e1d3d652d85c7d77d86a847c4a25d4ee0eab57380d10b94d2686d.json b/monero-rpc-pool/.sqlx/query-9f6d042ab61e1d3d652d85c7d77d86a847c4a25d4ee0eab57380d10b94d2686d.json deleted file mode 100644 index bd967a26..00000000 --- a/monero-rpc-pool/.sqlx/query-9f6d042ab61e1d3d652d85c7d77d86a847c4a25d4ee0eab57380d10b94d2686d.json +++ /dev/null @@ -1,110 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n SELECT \n n.id as \"id!: i64\",\n n.scheme,\n n.host,\n n.port,\n n.network,\n n.first_seen_at,\n CAST(COALESCE(stats.success_count, 0) AS INTEGER) as \"success_count!: i64\",\n CAST(COALESCE(stats.failure_count, 0) AS INTEGER) as \"failure_count!: i64\",\n stats.last_success as \"last_success?: String\",\n stats.last_failure as \"last_failure?: String\",\n stats.last_checked as \"last_checked?: String\",\n CAST(CASE WHEN reliable_nodes.node_id IS NOT NULL THEN 1 ELSE 0 END AS INTEGER) as \"is_reliable!: i64\",\n stats.avg_latency_ms as \"avg_latency_ms?: f64\",\n stats.min_latency_ms as \"min_latency_ms?: f64\",\n stats.max_latency_ms as \"max_latency_ms?: f64\",\n stats.last_latency_ms as \"last_latency_ms?: f64\"\n FROM monero_nodes n\n LEFT JOIN (\n SELECT \n node_id,\n SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,\n SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count,\n MAX(CASE WHEN was_successful THEN timestamp END) as last_success,\n MAX(CASE WHEN NOT was_successful THEN timestamp END) as last_failure,\n MAX(timestamp) as last_checked,\n AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms,\n MIN(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as min_latency_ms,\n MAX(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as max_latency_ms,\n (SELECT latency_ms FROM health_checks hc2 WHERE hc2.node_id = health_checks.node_id ORDER BY timestamp DESC LIMIT 1) as last_latency_ms\n FROM health_checks \n GROUP BY node_id\n ) stats ON n.id = stats.node_id\n LEFT JOIN (\n SELECT DISTINCT node_id FROM (\n SELECT \n n2.id as node_id,\n COALESCE(s2.success_count, 0) as success_count,\n COALESCE(s2.failure_count, 0) as failure_count,\n s2.avg_latency_ms,\n (CAST(COALESCE(s2.success_count, 0) AS REAL) / CAST(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0) AS REAL)) * \n (MIN(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0), 200) / 200.0) * 0.8 +\n CASE \n WHEN s2.avg_latency_ms IS NOT NULL THEN (1.0 - (MIN(s2.avg_latency_ms, 2000) / 2000.0)) * 0.2\n ELSE 0.0 \n END as reliability_score\n FROM monero_nodes n2\n LEFT JOIN (\n SELECT \n node_id,\n SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,\n SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count,\n AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms\n FROM health_checks \n GROUP BY node_id\n ) s2 ON n2.id = s2.node_id\n WHERE n2.network = ? AND (COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0)) > 0\n ORDER BY reliability_score DESC\n LIMIT 4\n )\n ) reliable_nodes ON n.id = reliable_nodes.node_id\n WHERE n.network = ?\n ORDER BY RANDOM()\n LIMIT ?\n ", - "describe": { - "columns": [ - { - "name": "id!: i64", - "ordinal": 0, - "type_info": "Integer" - }, - { - "name": "scheme", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "host", - "ordinal": 2, - "type_info": "Text" - }, - { - "name": "port", - "ordinal": 3, - "type_info": "Integer" - }, - { - "name": "network", - "ordinal": 4, - "type_info": "Text" - }, - { - "name": "first_seen_at", - "ordinal": 5, - "type_info": "Text" - }, - { - "name": "success_count!: i64", - "ordinal": 6, - "type_info": "Null" - }, - { - "name": "failure_count!: i64", - "ordinal": 7, - "type_info": "Null" - }, - { - "name": "last_success?: String", - "ordinal": 8, - "type_info": "Null" - }, - { - "name": "last_failure?: String", - "ordinal": 9, - "type_info": "Null" - }, - { - "name": "last_checked?: String", - "ordinal": 10, - "type_info": "Null" - }, - { - "name": "is_reliable!: i64", - "ordinal": 11, - "type_info": "Null" - }, - { - "name": "avg_latency_ms?: f64", - "ordinal": 12, - "type_info": "Null" - }, - { - "name": "min_latency_ms?: f64", - "ordinal": 13, - "type_info": "Null" - }, - { - "name": "max_latency_ms?: f64", - "ordinal": 14, - "type_info": "Null" - }, - { - "name": "last_latency_ms?: f64", - "ordinal": 15, - "type_info": "Float" - } - ], - "parameters": { - "Right": 3 - }, - "nullable": [ - true, - false, - false, - false, - false, - false, - null, - null, - null, - null, - null, - null, - null, - null, - null, - true - ] - }, - "hash": "9f6d042ab61e1d3d652d85c7d77d86a847c4a25d4ee0eab57380d10b94d2686d" -} diff --git a/monero-rpc-pool/.sqlx/query-b6d85d42bf72888afa22e27710e8cfe3885ed226ae6ae02d6585c1f2f4140d68.json b/monero-rpc-pool/.sqlx/query-b6d85d42bf72888afa22e27710e8cfe3885ed226ae6ae02d6585c1f2f4140d68.json deleted file mode 100644 index 50ed040f..00000000 --- a/monero-rpc-pool/.sqlx/query-b6d85d42bf72888afa22e27710e8cfe3885ed226ae6ae02d6585c1f2f4140d68.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n UPDATE monero_nodes \n SET network = ?, updated_at = ?\n WHERE scheme = ? AND host = ? AND port = ?\n ", - "describe": { - "columns": [], - "parameters": { - "Right": 5 - }, - "nullable": [] - }, - "hash": "b6d85d42bf72888afa22e27710e8cfe3885ed226ae6ae02d6585c1f2f4140d68" -} diff --git a/monero-rpc-pool/.sqlx/query-ffa1b76d20c86d6bea02bd03e5e7de159adbb7c7c0ef585ce4df9ec648bea7f8.json b/monero-rpc-pool/.sqlx/query-ffa1b76d20c86d6bea02bd03e5e7de159adbb7c7c0ef585ce4df9ec648bea7f8.json deleted file mode 100644 index 0ab81591..00000000 --- a/monero-rpc-pool/.sqlx/query-ffa1b76d20c86d6bea02bd03e5e7de159adbb7c7c0ef585ce4df9ec648bea7f8.json +++ /dev/null @@ -1,28 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n SELECT \n COUNT(*) as total,\n CAST(SUM(CASE WHEN stats.success_count > 0 THEN 1 ELSE 0 END) AS INTEGER) as \"reachable!: i64\",\n CAST((SELECT COUNT(*) FROM (\n SELECT n2.id\n FROM monero_nodes n2\n LEFT JOIN (\n SELECT \n node_id,\n SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,\n SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count,\n AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms\n FROM health_checks \n GROUP BY node_id\n ) s2 ON n2.id = s2.node_id\n WHERE n2.network = ? AND (COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0)) > 0\n ORDER BY \n (CAST(COALESCE(s2.success_count, 0) AS REAL) / CAST(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0) AS REAL)) * \n (MIN(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0), 200) / 200.0) * 0.8 +\n CASE \n WHEN s2.avg_latency_ms IS NOT NULL THEN (1.0 - (MIN(s2.avg_latency_ms, 2000) / 2000.0)) * 0.2\n ELSE 0.0 \n END DESC\n LIMIT 4\n )) AS INTEGER) as \"reliable!: i64\"\n FROM monero_nodes n\n LEFT JOIN (\n SELECT \n node_id,\n SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count,\n SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count\n FROM health_checks \n GROUP BY node_id\n ) stats ON n.id = stats.node_id\n WHERE n.network = ?\n ", - "describe": { - "columns": [ - { - "name": "total", - "ordinal": 0, - "type_info": "Integer" - }, - { - "name": "reachable!: i64", - "ordinal": 1, - "type_info": "Integer" - }, - { - "name": "reliable!: i64", - "ordinal": 2, - "type_info": "Integer" - } - ], - "parameters": { - "Right": 2 - }, - "nullable": [false, true, false] - }, - "hash": "ffa1b76d20c86d6bea02bd03e5e7de159adbb7c7c0ef585ce4df9ec648bea7f8" -} diff --git a/monero-rpc-pool/Cargo.toml b/monero-rpc-pool/Cargo.toml index aefd139b..8e3eb175 100644 --- a/monero-rpc-pool/Cargo.toml +++ b/monero-rpc-pool/Cargo.toml @@ -13,7 +13,6 @@ anyhow = "1" axum = { version = "0.7", features = ["macros"] } chrono = { version = "0.4", features = ["serde"] } clap = { version = "4.0", features = ["derive"] } -dirs = "5.0" futures = "0.3" monero = { version = "0.12", features = ["serde_support"] } monero-rpc = { path = "../monero-rpc" } diff --git a/monero-rpc-pool/migrations/20250628093515_add_default_nodes_from_feather.sql b/monero-rpc-pool/migrations/20250628093515_add_default_nodes_from_feather.sql new file mode 100644 index 00000000..ccf42a58 --- /dev/null +++ b/monero-rpc-pool/migrations/20250628093515_add_default_nodes_from_feather.sql @@ -0,0 +1,45 @@ +-- Adds the default nodes from Feather Wallet to the database +-- Clears older nodes from the database + +-- Delete all nodes from the database +DELETE FROM monero_nodes; + +-- Delete all health checks +DELETE FROM health_checks; + +-- Mainnet Nodes +INSERT OR IGNORE INTO monero_nodes (scheme, host, port, network, first_seen_at) VALUES +-- These support https +('https', 'node3-us.monero.love', 18081, 'mainnet', datetime('now')), +('https', 'xmr-node.cakewallet.com', 18081, 'mainnet', datetime('now')), +('https', 'node2.monerodevs.org', 18089, 'mainnet', datetime('now')), +('https', 'node3.monerodevs.org', 18089, 'mainnet', datetime('now')), +('https', 'node.sethforprivacy.com', 18089, 'mainnet', datetime('now')), +('https', 'xmr.stormycloud.org', 18089, 'mainnet', datetime('now')), +('https', 'node2-eu.monero.love', 18089, 'mainnet', datetime('now')), +('https', 'rucknium.me', 18081, 'mainnet', datetime('now')), +-- These do not support https +('http', 'singapore.node.xmr.pm', 18089, 'mainnet', datetime('now')), +('http', 'node.majesticbank.is', 18089, 'mainnet', datetime('now')), +('http', 'node.majesticbank.at', 18089, 'mainnet', datetime('now')), +('http', 'ravfx.its-a-node.org', 18081, 'mainnet', datetime('now')), +('http', 'ravfx2.its-a-node.org', 18089, 'mainnet', datetime('now')), +('http', 'selsta1.featherwallet.net', 18081, 'mainnet', datetime('now')), +('http', 'selsta2.featherwallet.net', 18081, 'mainnet', datetime('now')), +('http', 'node.trocador.app', 18089, 'mainnet', datetime('now')), +('http', 'node.xmr.ru', 18081, 'mainnet', datetime('now')); + + +-- Stagenet Nodes +INSERT OR IGNORE INTO monero_nodes (scheme, host, port, network, first_seen_at) VALUES +('https', 'node.sethforprivacy.com', 38089, 'stagenet', datetime('now')), +('https', 'xmr-lux.boldsuck.org', 38081, 'stagenet', datetime('now')), +('http', 'node2.sethforprivacy.com', 38089, 'stagenet', datetime('now')), +('http', 'stagenet.xmr-tw.org', 38081, 'stagenet', datetime('now')), +('http', 'singapore.node.xmr.pm', 38081, 'stagenet', datetime('now')), +('http', 'node.monerodevs.org', 38089, 'stagenet', datetime('now')), +('http', 'node2.monerodevs.org', 38089, 'stagenet', datetime('now')), +('http', 'node3.monerodevs.org', 38089, 'stagenet', datetime('now')), +('http', 'plowsoffjexmxalw73tkjmf422gq6575fc7vicuu4javzn2ynnte6tyd.onion', 38089, 'stagenet', datetime('now')), +('http', 'plowsof3t5hogddwabaeiyrno25efmzfxyro2vligremt7sxpsclfaid.onion', 38089, 'stagenet', datetime('now')), +('https', 'stagenet.xmr.ditatompel.com', 38081, 'stagenet', datetime('now')); \ No newline at end of file diff --git a/monero-rpc-pool/src/database.rs b/monero-rpc-pool/src/database.rs index e8fbea11..5e64ad32 100644 --- a/monero-rpc-pool/src/database.rs +++ b/monero-rpc-pool/src/database.rs @@ -1,108 +1,9 @@ use std::path::PathBuf; +use crate::types::{NodeAddress, NodeHealthStats, NodeMetadata, NodeRecord}; use anyhow::Result; -use dirs::data_dir; -use serde::{Deserialize, Serialize}; use sqlx::SqlitePool; -use tracing::{debug, info, warn}; - -#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] -pub struct MoneroNode { - pub id: Option, - pub scheme: String, // http or https - pub host: String, - pub port: i64, - pub network: String, // mainnet, stagenet, or testnet - always known at insertion time - pub first_seen_at: String, // ISO 8601 timestamp when first discovered - // Computed fields from health_checks (not stored in monero_nodes table) - #[sqlx(default)] - pub success_count: i64, - #[sqlx(default)] - pub failure_count: i64, - #[sqlx(default)] - pub last_success: Option, - #[sqlx(default)] - pub last_failure: Option, - #[sqlx(default)] - pub last_checked: Option, - #[sqlx(default)] - pub is_reliable: bool, - #[sqlx(default)] - pub avg_latency_ms: Option, - #[sqlx(default)] - pub min_latency_ms: Option, - #[sqlx(default)] - pub max_latency_ms: Option, - #[sqlx(default)] - pub last_latency_ms: Option, -} - -#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] -pub struct HealthCheck { - pub id: Option, - pub node_id: i64, - pub timestamp: String, // ISO 8601 timestamp - pub was_successful: bool, - pub latency_ms: Option, -} - -impl MoneroNode { - pub fn new(scheme: String, host: String, port: i64, network: String) -> Self { - // TODO: Do this in the database - let now = chrono::Utc::now().to_rfc3339(); - - Self { - id: None, - scheme, - host, - port, - network, - first_seen_at: now, - // These are computed from health_checks - success_count: 0, - failure_count: 0, - last_success: None, - last_failure: None, - last_checked: None, - is_reliable: false, - avg_latency_ms: None, - min_latency_ms: None, - max_latency_ms: None, - last_latency_ms: None, - } - } - - pub fn full_url(&self) -> String { - format!("{}://{}:{}", self.scheme, self.host, self.port) - } - - pub fn success_rate(&self) -> f64 { - let total = self.success_count + self.failure_count; - if total == 0 { - 0.0 - } else { - self.success_count as f64 / total as f64 - } - } - - pub fn reliability_score(&self) -> f64 { - let success_rate = self.success_rate(); - let total_requests = self.success_count + self.failure_count; - - // Weight success rate by total requests (more requests = more reliable data) - let request_weight = (total_requests as f64).min(200.0) / 200.0; - let mut score = success_rate * request_weight; - - // Factor in latency - lower latency = higher score - if let Some(avg_latency) = self.avg_latency_ms { - // Normalize latency to 0-1 range (assuming 0-2000ms range) - let latency_factor = 1.0 - (avg_latency.min(2000.0) / 2000.0); - score = score * 0.8 + latency_factor * 0.2; // 80% success rate, 20% latency - } - - score - } -} +use tracing::{info, warn}; #[derive(Clone)] pub struct Database { @@ -110,19 +11,15 @@ pub struct Database { } impl Database { - pub async fn new() -> Result { - let app_data_dir = get_app_data_dir()?; - Self::new_with_data_dir(app_data_dir).await - } - - pub async fn new_with_data_dir(data_dir: PathBuf) -> Result { + pub async fn new(data_dir: PathBuf) -> Result { if !data_dir.exists() { std::fs::create_dir_all(&data_dir)?; info!("Created application data directory: {}", data_dir.display()); } let db_path = data_dir.join("nodes.db"); - info!("Using database at: {}", db_path.display()); + + info!("Using database at {}", db_path.display()); let database_url = format!("sqlite:{}?mode=rwc", db_path.display()); let pool = SqlitePool::connect(&database_url).await?; @@ -134,81 +31,9 @@ impl Database { } async fn migrate(&self) -> Result<()> { - // Run sqlx migrations sqlx::migrate!("./migrations").run(&self.pool).await?; info!("Database migration completed"); - Ok(()) - } - - /// Insert a node if it doesn't exist, return the node_id - pub async fn upsert_node( - &self, - scheme: &str, - host: &str, - port: i64, - network: &str, - ) -> Result { - let now = chrono::Utc::now().to_rfc3339(); - - let result = sqlx::query!( - r#" - INSERT INTO monero_nodes (scheme, host, port, network, first_seen_at, updated_at) - VALUES (?, ?, ?, ?, ?, ?) - ON CONFLICT(scheme, host, port) DO UPDATE SET - network = excluded.network, - updated_at = excluded.updated_at - RETURNING id - "#, - scheme, - host, - port, - network, - now, - now - ) - .fetch_one(&self.pool) - .await?; - - Ok(result.id) - } - - /// Update a node's network after it has been identified - pub async fn update_node_network( - &self, - scheme: &str, - host: &str, - port: i64, - network: &str, - ) -> Result<()> { - let now = chrono::Utc::now().to_rfc3339(); - - let result = sqlx::query!( - r#" - UPDATE monero_nodes - SET network = ?, updated_at = ? - WHERE scheme = ? AND host = ? AND port = ? - "#, - network, - now, - scheme, - host, - port - ) - .execute(&self.pool) - .await?; - - if result.rows_affected() > 0 { - debug!( - "Updated network for node {}://{}:{} to {}", - scheme, host, port, network - ); - } else { - warn!( - "Failed to update network for node {}://{}:{}: not found", - scheme, host, port - ); - } Ok(()) } @@ -222,151 +47,34 @@ impl Database { was_successful: bool, latency_ms: Option, ) -> Result<()> { - let now = chrono::Utc::now().to_rfc3339(); - - // First get the node_id - let node_row = sqlx::query!( - "SELECT id FROM monero_nodes WHERE scheme = ? AND host = ? AND port = ?", + let result = sqlx::query!( + r#" + INSERT INTO health_checks (node_id, timestamp, was_successful, latency_ms) + SELECT id, datetime('now'), ?, ? + FROM monero_nodes + WHERE scheme = ? AND host = ? AND port = ? + "#, + was_successful, + latency_ms, scheme, host, port ) - .fetch_optional(&self.pool) - .await?; - - let node_id = match node_row { - Some(row) => row.id, - None => { - warn!( - "Cannot record health check for unknown node: {}://{}:{}", - scheme, host, port - ); - return Ok(()); - } - }; - - sqlx::query!( - r#" - INSERT INTO health_checks (node_id, timestamp, was_successful, latency_ms) - VALUES (?, ?, ?, ?) - "#, - node_id, - now, - was_successful, - latency_ms - ) .execute(&self.pool) .await?; + if result.rows_affected() == 0 { + warn!( + "Cannot record health check for unknown node: {}://{}:{}", + scheme, host, port + ); + } + Ok(()) } - /// Get nodes that have been identified (have network set) - pub async fn get_identified_nodes(&self, network: &str) -> Result> { - let rows = sqlx::query!( - r#" - SELECT - n.id as "id!: i64", - n.scheme, - n.host, - n.port, - n.network, - n.first_seen_at, - CAST(COALESCE(stats.success_count, 0) AS INTEGER) as "success_count!: i64", - CAST(COALESCE(stats.failure_count, 0) AS INTEGER) as "failure_count!: i64", - stats.last_success as "last_success?: String", - stats.last_failure as "last_failure?: String", - stats.last_checked as "last_checked?: String", - CAST(CASE WHEN reliable_nodes.node_id IS NOT NULL THEN 1 ELSE 0 END AS INTEGER) as "is_reliable!: i64", - stats.avg_latency_ms as "avg_latency_ms?: f64", - stats.min_latency_ms as "min_latency_ms?: f64", - stats.max_latency_ms as "max_latency_ms?: f64", - stats.last_latency_ms as "last_latency_ms?: f64" - FROM monero_nodes n - LEFT JOIN ( - SELECT - node_id, - SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count, - SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count, - MAX(CASE WHEN was_successful THEN timestamp END) as last_success, - MAX(CASE WHEN NOT was_successful THEN timestamp END) as last_failure, - MAX(timestamp) as last_checked, - AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms, - MIN(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as min_latency_ms, - MAX(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as max_latency_ms, - (SELECT latency_ms FROM health_checks hc2 WHERE hc2.node_id = health_checks.node_id ORDER BY timestamp DESC LIMIT 1) as last_latency_ms - FROM health_checks - GROUP BY node_id - ) stats ON n.id = stats.node_id - LEFT JOIN ( - SELECT DISTINCT node_id FROM ( - SELECT - n2.id as node_id, - COALESCE(s2.success_count, 0) as success_count, - COALESCE(s2.failure_count, 0) as failure_count, - s2.avg_latency_ms, - (CAST(COALESCE(s2.success_count, 0) AS REAL) / CAST(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0) AS REAL)) * - (MIN(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0), 200) / 200.0) * 0.8 + - CASE - WHEN s2.avg_latency_ms IS NOT NULL THEN (1.0 - (MIN(s2.avg_latency_ms, 2000) / 2000.0)) * 0.2 - ELSE 0.0 - END as reliability_score - FROM monero_nodes n2 - LEFT JOIN ( - SELECT - node_id, - SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count, - SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count, - AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms - FROM health_checks - GROUP BY node_id - ) s2 ON n2.id = s2.node_id - WHERE n2.network = ? AND (COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0)) > 0 - ORDER BY reliability_score DESC - LIMIT 4 - ) - ) reliable_nodes ON n.id = reliable_nodes.node_id - WHERE n.network = ? - ORDER BY stats.avg_latency_ms ASC, stats.success_count DESC - "#, - network, - network - ) - .fetch_all(&self.pool) - .await?; - - let nodes: Vec = rows - .into_iter() - .map(|row| MoneroNode { - id: Some(row.id), - scheme: row.scheme, - host: row.host, - port: row.port, - network: row.network, - first_seen_at: row.first_seen_at, - success_count: row.success_count, - failure_count: row.failure_count, - last_success: row.last_success, - last_failure: row.last_failure, - last_checked: row.last_checked, - is_reliable: row.is_reliable != 0, - avg_latency_ms: row.avg_latency_ms, - min_latency_ms: row.min_latency_ms, - max_latency_ms: row.max_latency_ms, - last_latency_ms: row.last_latency_ms, - }) - .collect(); - - debug!( - "Retrieved {} identified nodes for network {}", - nodes.len(), - network - ); - Ok(nodes) - } - /// Get reliable nodes (top 4 by reliability score) - pub async fn get_reliable_nodes(&self, network: &str) -> Result> { + pub async fn get_reliable_nodes(&self, network: &str) -> Result> { let rows = sqlx::query!( r#" SELECT @@ -417,25 +125,28 @@ impl Database { .fetch_all(&self.pool) .await?; - let nodes = rows + let nodes: Vec = rows .into_iter() - .map(|row| MoneroNode { - id: Some(row.id), - scheme: row.scheme, - host: row.host, - port: row.port, - network: row.network, - first_seen_at: row.first_seen_at, - success_count: row.success_count, - failure_count: row.failure_count, - last_success: row.last_success, - last_failure: row.last_failure, - last_checked: row.last_checked, - is_reliable: true, - avg_latency_ms: row.avg_latency_ms, - min_latency_ms: row.min_latency_ms, - max_latency_ms: row.max_latency_ms, - last_latency_ms: row.last_latency_ms, + .map(|row| { + let address = NodeAddress::new(row.scheme, row.host, row.port as u16); + let first_seen_at = row + .first_seen_at + .parse() + .unwrap_or_else(|_| chrono::Utc::now()); + + let metadata = NodeMetadata::new(row.id, row.network, first_seen_at); + let health = NodeHealthStats { + success_count: row.success_count, + failure_count: row.failure_count, + last_success: row.last_success.and_then(|s| s.parse().ok()), + last_failure: row.last_failure.and_then(|s| s.parse().ok()), + last_checked: row.last_checked.and_then(|s| s.parse().ok()), + avg_latency_ms: row.avg_latency_ms, + min_latency_ms: row.min_latency_ms, + max_latency_ms: row.max_latency_ms, + last_latency_ms: row.last_latency_ms, + }; + NodeRecord::new(address, metadata, health) }) .collect(); @@ -449,28 +160,7 @@ impl Database { SELECT COUNT(*) as total, CAST(SUM(CASE WHEN stats.success_count > 0 THEN 1 ELSE 0 END) AS INTEGER) as "reachable!: i64", - CAST((SELECT COUNT(*) FROM ( - SELECT n2.id - FROM monero_nodes n2 - LEFT JOIN ( - SELECT - node_id, - SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count, - SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count, - AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms - FROM health_checks - GROUP BY node_id - ) s2 ON n2.id = s2.node_id - WHERE n2.network = ? AND (COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0)) > 0 - ORDER BY - (CAST(COALESCE(s2.success_count, 0) AS REAL) / CAST(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0) AS REAL)) * - (MIN(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0), 200) / 200.0) * 0.8 + - CASE - WHEN s2.avg_latency_ms IS NOT NULL THEN (1.0 - (MIN(s2.avg_latency_ms, 2000) / 2000.0)) * 0.2 - ELSE 0.0 - END DESC - LIMIT 4 - )) AS INTEGER) as "reliable!: i64" + CAST(SUM(CASE WHEN stats.success_count > stats.failure_count AND stats.success_count > 0 THEN 1 ELSE 0 END) AS INTEGER) as "reliable!: i64" FROM monero_nodes n LEFT JOIN ( SELECT @@ -482,17 +172,12 @@ impl Database { ) stats ON n.id = stats.node_id WHERE n.network = ? "#, - network, network ) .fetch_one(&self.pool) .await?; - let total = row.total; - let reachable = row.reachable; - let reliable = row.reliable; - - Ok((total, reachable, reliable)) + Ok((row.total, row.reachable, row.reliable)) } /// Get health check statistics for a network @@ -522,444 +207,52 @@ impl Database { Ok((successful, unsuccessful)) } - /// Get top nodes based on recent success rate and latency + /// Get top nodes based on success rate pub async fn get_top_nodes_by_recent_success( &self, network: &str, - _recent_checks_limit: i64, limit: i64, - ) -> Result> { + ) -> Result> { let rows = sqlx::query!( r#" SELECT - n.id as "id!: i64", n.scheme, n.host, - n.port, - n.network, - n.first_seen_at, - CAST(COALESCE(stats.success_count, 0) AS INTEGER) as "success_count!: i64", - CAST(COALESCE(stats.failure_count, 0) AS INTEGER) as "failure_count!: i64", - stats.last_success as "last_success?: String", - stats.last_failure as "last_failure?: String", - stats.last_checked as "last_checked?: String", - CAST(CASE WHEN reliable_nodes.node_id IS NOT NULL THEN 1 ELSE 0 END AS INTEGER) as "is_reliable!: i64", - stats.avg_latency_ms as "avg_latency_ms?: f64", - stats.min_latency_ms as "min_latency_ms?: f64", - stats.max_latency_ms as "max_latency_ms?: f64", - stats.last_latency_ms as "last_latency_ms?: f64" + n.port FROM monero_nodes n LEFT JOIN ( SELECT node_id, SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count, - SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count, - MAX(CASE WHEN was_successful THEN timestamp END) as last_success, - MAX(CASE WHEN NOT was_successful THEN timestamp END) as last_failure, - MAX(timestamp) as last_checked, - AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms, - MIN(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as min_latency_ms, - MAX(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as max_latency_ms, - (SELECT latency_ms FROM health_checks hc2 WHERE hc2.node_id = health_checks.node_id ORDER BY timestamp DESC LIMIT 1) as last_latency_ms - FROM health_checks + SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count + FROM ( + SELECT node_id, was_successful + FROM health_checks + ORDER BY timestamp DESC + LIMIT 1000 + ) recent_checks GROUP BY node_id ) stats ON n.id = stats.node_id - LEFT JOIN ( - SELECT DISTINCT node_id FROM ( - SELECT - n2.id as node_id, - COALESCE(s2.success_count, 0) as success_count, - COALESCE(s2.failure_count, 0) as failure_count, - s2.avg_latency_ms, - (CAST(COALESCE(s2.success_count, 0) AS REAL) / CAST(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0) AS REAL)) * - (MIN(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0), 200) / 200.0) * 0.8 + - CASE - WHEN s2.avg_latency_ms IS NOT NULL THEN (1.0 - (MIN(s2.avg_latency_ms, 2000) / 2000.0)) * 0.2 - ELSE 0.0 - END as reliability_score - FROM monero_nodes n2 - LEFT JOIN ( - SELECT - node_id, - SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count, - SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count, - AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms - FROM health_checks - GROUP BY node_id - ) s2 ON n2.id = s2.node_id - WHERE n2.network = ? AND (COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0)) > 0 - ORDER BY reliability_score DESC - LIMIT 4 - ) - ) reliable_nodes ON n.id = reliable_nodes.node_id - WHERE n.network = ? AND (COALESCE(stats.success_count, 0) + COALESCE(stats.failure_count, 0)) > 0 + WHERE n.network = ? ORDER BY - (CAST(COALESCE(stats.success_count, 0) AS REAL) / CAST(COALESCE(stats.success_count, 0) + COALESCE(stats.failure_count, 0) AS REAL)) DESC, - stats.avg_latency_ms ASC + CASE + WHEN (COALESCE(stats.success_count, 0) + COALESCE(stats.failure_count, 0)) > 0 + THEN CAST(COALESCE(stats.success_count, 0) AS REAL) / CAST(COALESCE(stats.success_count, 0) + COALESCE(stats.failure_count, 0) AS REAL) + ELSE 0.0 + END DESC LIMIT ? "#, network, - network, limit ) .fetch_all(&self.pool) .await?; - let nodes = rows + let addresses: Vec = rows .into_iter() - .map(|row| MoneroNode { - id: Some(row.id), - scheme: row.scheme, - host: row.host, - port: row.port, - network: row.network, - first_seen_at: row.first_seen_at, - success_count: row.success_count, - failure_count: row.failure_count, - last_success: row.last_success, - last_failure: row.last_failure, - last_checked: row.last_checked, - is_reliable: row.is_reliable != 0, - avg_latency_ms: row.avg_latency_ms, - min_latency_ms: row.min_latency_ms, - max_latency_ms: row.max_latency_ms, - last_latency_ms: row.last_latency_ms, - }) + .map(|row| NodeAddress::new(row.scheme, row.host, row.port as u16)) .collect(); - Ok(nodes) - } - - /// Get identified nodes that have at least one successful health check - pub async fn get_identified_nodes_with_success( - &self, - network: &str, - ) -> Result> { - let rows = sqlx::query!( - r#" - SELECT - n.id as "id!: i64", - n.scheme, - n.host, - n.port, - n.network, - n.first_seen_at, - CAST(COALESCE(stats.success_count, 0) AS INTEGER) as "success_count!: i64", - CAST(COALESCE(stats.failure_count, 0) AS INTEGER) as "failure_count!: i64", - stats.last_success as "last_success?: String", - stats.last_failure as "last_failure?: String", - stats.last_checked as "last_checked?: String", - CAST(CASE WHEN reliable_nodes.node_id IS NOT NULL THEN 1 ELSE 0 END AS INTEGER) as "is_reliable!: i64", - stats.avg_latency_ms as "avg_latency_ms?: f64", - stats.min_latency_ms as "min_latency_ms?: f64", - stats.max_latency_ms as "max_latency_ms?: f64", - stats.last_latency_ms as "last_latency_ms?: f64" - FROM monero_nodes n - LEFT JOIN ( - SELECT - node_id, - SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count, - SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count, - MAX(CASE WHEN was_successful THEN timestamp END) as last_success, - MAX(CASE WHEN NOT was_successful THEN timestamp END) as last_failure, - MAX(timestamp) as last_checked, - AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms, - MIN(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as min_latency_ms, - MAX(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as max_latency_ms, - (SELECT latency_ms FROM health_checks hc2 WHERE hc2.node_id = health_checks.node_id ORDER BY timestamp DESC LIMIT 1) as last_latency_ms - FROM health_checks - GROUP BY node_id - ) stats ON n.id = stats.node_id - LEFT JOIN ( - SELECT DISTINCT node_id FROM ( - SELECT - n2.id as node_id, - COALESCE(s2.success_count, 0) as success_count, - COALESCE(s2.failure_count, 0) as failure_count, - s2.avg_latency_ms, - (CAST(COALESCE(s2.success_count, 0) AS REAL) / CAST(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0) AS REAL)) * - (MIN(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0), 200) / 200.0) * 0.8 + - CASE - WHEN s2.avg_latency_ms IS NOT NULL THEN (1.0 - (MIN(s2.avg_latency_ms, 2000) / 2000.0)) * 0.2 - ELSE 0.0 - END as reliability_score - FROM monero_nodes n2 - LEFT JOIN ( - SELECT - node_id, - SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count, - SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count, - AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms - FROM health_checks - GROUP BY node_id - ) s2 ON n2.id = s2.node_id - WHERE n2.network = ? AND (COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0)) > 0 - ORDER BY reliability_score DESC - LIMIT 4 - ) - ) reliable_nodes ON n.id = reliable_nodes.node_id - WHERE n.network = ? AND stats.success_count > 0 - ORDER BY stats.avg_latency_ms ASC, stats.success_count DESC - "#, - network, - network - ) - .fetch_all(&self.pool) - .await?; - - let nodes: Vec = rows - .into_iter() - .map(|row| MoneroNode { - id: Some(row.id), - scheme: row.scheme, - host: row.host, - port: row.port, - network: row.network, - first_seen_at: row.first_seen_at, - success_count: row.success_count, - failure_count: row.failure_count, - last_success: row.last_success, - last_failure: row.last_failure, - last_checked: row.last_checked, - is_reliable: row.is_reliable != 0, - avg_latency_ms: row.avg_latency_ms, - min_latency_ms: row.min_latency_ms, - max_latency_ms: row.max_latency_ms, - last_latency_ms: row.last_latency_ms, - }) - .collect(); - - debug!( - "Retrieved {} identified nodes with success for network {}", - nodes.len(), - network - ); - Ok(nodes) - } - - /// Get random nodes for the specified network, excluding specific IDs - pub async fn get_random_nodes( - &self, - network: &str, - limit: i64, - exclude_ids: &[i64], - ) -> Result> { - if exclude_ids.is_empty() { - let rows = sqlx::query!( - r#" - SELECT - n.id as "id!: i64", - n.scheme, - n.host, - n.port, - n.network, - n.first_seen_at, - CAST(COALESCE(stats.success_count, 0) AS INTEGER) as "success_count!: i64", - CAST(COALESCE(stats.failure_count, 0) AS INTEGER) as "failure_count!: i64", - stats.last_success as "last_success?: String", - stats.last_failure as "last_failure?: String", - stats.last_checked as "last_checked?: String", - CAST(CASE WHEN reliable_nodes.node_id IS NOT NULL THEN 1 ELSE 0 END AS INTEGER) as "is_reliable!: i64", - stats.avg_latency_ms as "avg_latency_ms?: f64", - stats.min_latency_ms as "min_latency_ms?: f64", - stats.max_latency_ms as "max_latency_ms?: f64", - stats.last_latency_ms as "last_latency_ms?: f64" - FROM monero_nodes n - LEFT JOIN ( - SELECT - node_id, - SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count, - SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count, - MAX(CASE WHEN was_successful THEN timestamp END) as last_success, - MAX(CASE WHEN NOT was_successful THEN timestamp END) as last_failure, - MAX(timestamp) as last_checked, - AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms, - MIN(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as min_latency_ms, - MAX(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as max_latency_ms, - (SELECT latency_ms FROM health_checks hc2 WHERE hc2.node_id = health_checks.node_id ORDER BY timestamp DESC LIMIT 1) as last_latency_ms - FROM health_checks - GROUP BY node_id - ) stats ON n.id = stats.node_id - LEFT JOIN ( - SELECT DISTINCT node_id FROM ( - SELECT - n2.id as node_id, - COALESCE(s2.success_count, 0) as success_count, - COALESCE(s2.failure_count, 0) as failure_count, - s2.avg_latency_ms, - (CAST(COALESCE(s2.success_count, 0) AS REAL) / CAST(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0) AS REAL)) * - (MIN(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0), 200) / 200.0) * 0.8 + - CASE - WHEN s2.avg_latency_ms IS NOT NULL THEN (1.0 - (MIN(s2.avg_latency_ms, 2000) / 2000.0)) * 0.2 - ELSE 0.0 - END as reliability_score - FROM monero_nodes n2 - LEFT JOIN ( - SELECT - node_id, - SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count, - SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count, - AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms - FROM health_checks - GROUP BY node_id - ) s2 ON n2.id = s2.node_id - WHERE n2.network = ? AND (COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0)) > 0 - ORDER BY reliability_score DESC - LIMIT 4 - ) - ) reliable_nodes ON n.id = reliable_nodes.node_id - WHERE n.network = ? - ORDER BY RANDOM() - LIMIT ? - "#, - network, - network, - limit - ) - .fetch_all(&self.pool) - .await?; - - return Ok(rows - .into_iter() - .map(|row| MoneroNode { - id: Some(row.id), - scheme: row.scheme, - host: row.host, - port: row.port, - network: row.network, - first_seen_at: row.first_seen_at, - success_count: row.success_count, - failure_count: row.failure_count, - last_success: row.last_success, - last_failure: row.last_failure, - last_checked: row.last_checked, - is_reliable: row.is_reliable != 0, - avg_latency_ms: row.avg_latency_ms, - min_latency_ms: row.min_latency_ms, - max_latency_ms: row.max_latency_ms, - last_latency_ms: row.last_latency_ms, - }) - .collect()); - } - - // If exclude_ids is not empty, we need to handle it differently - // For now, get all nodes and filter in Rust (can be optimized with dynamic SQL) - let fetch_limit = limit + exclude_ids.len() as i64 + 10; // Get extra to account for exclusions - let all_rows = sqlx::query!( - r#" - SELECT - n.id as "id!: i64", - n.scheme, - n.host, - n.port, - n.network, - n.first_seen_at, - CAST(COALESCE(stats.success_count, 0) AS INTEGER) as "success_count!: i64", - CAST(COALESCE(stats.failure_count, 0) AS INTEGER) as "failure_count!: i64", - stats.last_success as "last_success?: String", - stats.last_failure as "last_failure?: String", - stats.last_checked as "last_checked?: String", - CAST(CASE WHEN reliable_nodes.node_id IS NOT NULL THEN 1 ELSE 0 END AS INTEGER) as "is_reliable!: i64", - stats.avg_latency_ms as "avg_latency_ms?: f64", - stats.min_latency_ms as "min_latency_ms?: f64", - stats.max_latency_ms as "max_latency_ms?: f64", - stats.last_latency_ms as "last_latency_ms?: f64" - FROM monero_nodes n - LEFT JOIN ( - SELECT - node_id, - SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count, - SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count, - MAX(CASE WHEN was_successful THEN timestamp END) as last_success, - MAX(CASE WHEN NOT was_successful THEN timestamp END) as last_failure, - MAX(timestamp) as last_checked, - AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms, - MIN(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as min_latency_ms, - MAX(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as max_latency_ms, - (SELECT latency_ms FROM health_checks hc2 WHERE hc2.node_id = health_checks.node_id ORDER BY timestamp DESC LIMIT 1) as last_latency_ms - FROM health_checks - GROUP BY node_id - ) stats ON n.id = stats.node_id - LEFT JOIN ( - SELECT DISTINCT node_id FROM ( - SELECT - n2.id as node_id, - COALESCE(s2.success_count, 0) as success_count, - COALESCE(s2.failure_count, 0) as failure_count, - s2.avg_latency_ms, - (CAST(COALESCE(s2.success_count, 0) AS REAL) / CAST(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0) AS REAL)) * - (MIN(COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0), 200) / 200.0) * 0.8 + - CASE - WHEN s2.avg_latency_ms IS NOT NULL THEN (1.0 - (MIN(s2.avg_latency_ms, 2000) / 2000.0)) * 0.2 - ELSE 0.0 - END as reliability_score - FROM monero_nodes n2 - LEFT JOIN ( - SELECT - node_id, - SUM(CASE WHEN was_successful THEN 1 ELSE 0 END) as success_count, - SUM(CASE WHEN NOT was_successful THEN 1 ELSE 0 END) as failure_count, - AVG(CASE WHEN was_successful AND latency_ms IS NOT NULL THEN latency_ms END) as avg_latency_ms - FROM health_checks - GROUP BY node_id - ) s2 ON n2.id = s2.node_id - WHERE n2.network = ? AND (COALESCE(s2.success_count, 0) + COALESCE(s2.failure_count, 0)) > 0 - ORDER BY reliability_score DESC - LIMIT 4 - ) - ) reliable_nodes ON n.id = reliable_nodes.node_id - WHERE n.network = ? - ORDER BY RANDOM() - LIMIT ? - "#, - network, - network, - fetch_limit - ) - .fetch_all(&self.pool) - .await?; - - // Convert exclude_ids to a HashSet for O(1) lookup - let exclude_set: std::collections::HashSet = exclude_ids.iter().cloned().collect(); - - let nodes: Vec = all_rows - .into_iter() - .filter(|row| !exclude_set.contains(&row.id)) - .take(limit as usize) - .map(|row| MoneroNode { - id: Some(row.id), - scheme: row.scheme, - host: row.host, - port: row.port, - network: row.network, - first_seen_at: row.first_seen_at, - success_count: row.success_count, - failure_count: row.failure_count, - last_success: row.last_success, - last_failure: row.last_failure, - last_checked: row.last_checked, - is_reliable: row.is_reliable != 0, - avg_latency_ms: row.avg_latency_ms, - min_latency_ms: row.min_latency_ms, - max_latency_ms: row.max_latency_ms, - last_latency_ms: row.last_latency_ms, - }) - .collect(); - - Ok(nodes) + Ok(addresses) } } - -pub fn get_app_data_dir() -> Result { - let base_dir = - data_dir().ok_or_else(|| anyhow::anyhow!("Could not determine system data directory"))?; - - let app_dir = base_dir.join("monero-rpc-pool"); - - if !app_dir.exists() { - std::fs::create_dir_all(&app_dir)?; - info!("Created application data directory: {}", app_dir.display()); - } - - Ok(app_dir) -} diff --git a/monero-rpc-pool/src/discovery.rs b/monero-rpc-pool/src/discovery.rs deleted file mode 100644 index b627777f..00000000 --- a/monero-rpc-pool/src/discovery.rs +++ /dev/null @@ -1,399 +0,0 @@ -use std::collections::HashSet; -use std::time::{Duration, Instant}; - -use anyhow::Result; -use monero::Network; -use rand::seq::SliceRandom; -use reqwest::Client; -use serde::Deserialize; -use serde_json::Value; -use tracing::{error, info, warn}; -use url; - -use crate::database::Database; - -#[derive(Debug, Deserialize)] -struct MoneroFailResponse { - monero: MoneroNodes, -} - -#[derive(Debug, Deserialize)] -struct MoneroNodes { - clear: Vec, - #[serde(default)] - web_compatible: Vec, -} - -#[derive(Debug)] -pub struct HealthCheckOutcome { - pub was_successful: bool, - pub latency: Duration, - pub discovered_network: Option, -} - -#[derive(Clone)] -pub struct NodeDiscovery { - client: Client, - db: Database, -} - -fn network_to_string(network: &Network) -> String { - match network { - Network::Mainnet => "mainnet".to_string(), - Network::Stagenet => "stagenet".to_string(), - Network::Testnet => "testnet".to_string(), - } -} - -impl NodeDiscovery { - pub fn new(db: Database) -> Result { - let client = Client::builder() - .timeout(Duration::from_secs(10)) - .user_agent("monero-rpc-pool/1.0") - .build() - .map_err(|e| anyhow::anyhow!("Failed to build HTTP client: {}", e))?; - - Ok(Self { client, db }) - } - - /// Fetch nodes from monero.fail API - pub async fn fetch_mainnet_nodes_from_api(&self) -> Result> { - let url = "https://monero.fail/nodes.json?chain=monero"; - - let response = self - .client - .get(url) - .timeout(Duration::from_secs(30)) - .send() - .await?; - - if !response.status().is_success() { - return Err(anyhow::anyhow!("HTTP error: {}", response.status())); - } - - let monero_fail_response: MoneroFailResponse = response.json().await?; - - // Combine clear and web_compatible nodes - let mut nodes = monero_fail_response.monero.web_compatible; - nodes.extend(monero_fail_response.monero.clear); - - // Remove duplicates using HashSet for O(n) complexity - let mut seen = HashSet::new(); - let mut unique_nodes = Vec::new(); - for node in nodes { - if seen.insert(node.clone()) { - unique_nodes.push(node); - } - } - - // Shuffle nodes in random order - let mut rng = rand::thread_rng(); - unique_nodes.shuffle(&mut rng); - - info!( - "Fetched {} mainnet nodes from monero.fail API", - unique_nodes.len() - ); - Ok(unique_nodes) - } - - /// Fetch nodes from monero.fail API and discover from other sources - pub async fn discover_nodes_from_sources(&self, target_network: Network) -> Result<()> { - // Only fetch from external sources for mainnet to avoid polluting test networks - if target_network == Network::Mainnet { - match self.fetch_mainnet_nodes_from_api().await { - Ok(nodes) => { - self.discover_and_insert_nodes(target_network, nodes) - .await?; - } - Err(e) => { - warn!("Failed to fetch nodes from monero.fail API: {}", e); - } - } - } - - Ok(()) - } - - /// Enhanced health check that detects network and validates node identity - pub async fn check_node_health( - &self, - scheme: &str, - host: &str, - port: i64, - ) -> Result { - let start_time = Instant::now(); - - let rpc_request = serde_json::json!({ - "jsonrpc": "2.0", - "id": "0", - "method": "get_info" - }); - - let node_url = format!("{}://{}:{}/json_rpc", scheme, host, port); - let response = self.client.post(&node_url).json(&rpc_request).send().await; - - let latency = start_time.elapsed(); - - match response { - Ok(resp) => { - if resp.status().is_success() { - match resp.json::().await { - Ok(json) => { - if let Some(result) = json.get("result") { - // Extract network information from get_info response - let discovered_network = self.extract_network_from_info(result); - - Ok(HealthCheckOutcome { - was_successful: true, - latency, - discovered_network, - }) - } else { - Ok(HealthCheckOutcome { - was_successful: false, - latency, - discovered_network: None, - }) - } - } - Err(_e) => Ok(HealthCheckOutcome { - was_successful: false, - latency, - discovered_network: None, - }), - } - } else { - Ok(HealthCheckOutcome { - was_successful: false, - latency, - discovered_network: None, - }) - } - } - Err(_e) => Ok(HealthCheckOutcome { - was_successful: false, - latency, - discovered_network: None, - }), - } - } - - /// Extract network type from get_info response - fn extract_network_from_info(&self, info_result: &Value) -> Option { - // Check nettype field (0 = mainnet, 1 = testnet, 2 = stagenet) - if let Some(nettype) = info_result.get("nettype").and_then(|v| v.as_u64()) { - return match nettype { - 0 => Some(Network::Mainnet), - 1 => Some(Network::Testnet), - 2 => Some(Network::Stagenet), - _ => None, - }; - } - - // Fallback: check if testnet or stagenet is mentioned in fields - if let Some(testnet) = info_result.get("testnet").and_then(|v| v.as_bool()) { - return if testnet { - Some(Network::Testnet) - } else { - Some(Network::Mainnet) - }; - } - - // Additional heuristics could be added here - None - } - - /// Updated health check workflow with identification and validation logic - pub async fn health_check_all_nodes(&self, target_network: Network) -> Result<()> { - info!( - "Starting health check for all nodes targeting network: {}", - network_to_string(&target_network) - ); - - // Get all nodes from database with proper field mapping - let all_nodes = sqlx::query!( - r#" - SELECT - id as "id!: i64", - scheme, - host, - port, - network as "network!: String", - first_seen_at - FROM monero_nodes - ORDER BY id - "# - ) - .fetch_all(&self.db.pool) - .await?; - - let mut checked_count = 0; - let mut healthy_count = 0; - let mut corrected_count = 0; - - for node in all_nodes { - match self - .check_node_health(&node.scheme, &node.host, node.port) - .await - { - Ok(outcome) => { - // Always record the health check - self.db - .record_health_check( - &node.scheme, - &node.host, - node.port, - outcome.was_successful, - if outcome.was_successful { - Some(outcome.latency.as_millis() as f64) - } else { - None - }, - ) - .await?; - - if outcome.was_successful { - healthy_count += 1; - - // Validate network consistency - if let Some(discovered_network) = outcome.discovered_network { - let discovered_network_str = network_to_string(&discovered_network); - if node.network != discovered_network_str { - let node_url = - format!("{}://{}:{}", node.scheme, node.host, node.port); - warn!("Network mismatch detected for node {}: stored={}, discovered={}. Correcting...", - node_url, node.network, discovered_network_str); - self.db - .update_node_network( - &node.scheme, - &node.host, - node.port, - &discovered_network_str, - ) - .await?; - corrected_count += 1; - } - } - } - checked_count += 1; - } - Err(_e) => { - self.db - .record_health_check(&node.scheme, &node.host, node.port, false, None) - .await?; - } - } - - // Small delay to avoid hammering nodes - tokio::time::sleep(Duration::from_secs(2)).await; - } - - info!( - "Health check completed: {}/{} nodes healthy, {} corrected", - healthy_count, checked_count, corrected_count - ); - - Ok(()) - } - - /// Periodic discovery task with improved error handling - pub async fn periodic_discovery_task(&self, target_network: Network) -> Result<()> { - let mut interval = tokio::time::interval(Duration::from_secs(3600)); // Every hour - - loop { - interval.tick().await; - - info!( - "Running periodic node discovery for network: {}", - network_to_string(&target_network) - ); - - // Discover new nodes from sources - if let Err(e) = self.discover_nodes_from_sources(target_network).await { - error!("Failed to discover nodes: {}", e); - } - - // Health check all nodes (will identify networks automatically) - if let Err(e) = self.health_check_all_nodes(target_network).await { - error!("Failed to perform health check: {}", e); - } - - // Log stats for all networks - for network in &[Network::Mainnet, Network::Stagenet, Network::Testnet] { - let network_str = network_to_string(network); - if let Ok((total, reachable, reliable)) = self.db.get_node_stats(&network_str).await - { - if total > 0 { - info!( - "Node stats for {}: {} total, {} reachable, {} reliable", - network_str, total, reachable, reliable - ); - } - } - } - } - } - - /// Insert configured nodes for a specific network - pub async fn discover_and_insert_nodes( - &self, - target_network: Network, - nodes: Vec, - ) -> Result<()> { - let mut success_count = 0; - let mut error_count = 0; - let target_network_str = network_to_string(&target_network); - - for node_url in nodes.iter() { - if let Ok(url) = url::Url::parse(node_url) { - let scheme = url.scheme(); - - // Validate scheme - must be http or https - if !matches!(scheme, "http" | "https") { - continue; - } - - // Validate host - must be non-empty - let Some(host) = url.host_str() else { - continue; - }; - if host.is_empty() { - continue; - } - - // Validate port - must be present - let Some(port) = url.port() else { - continue; - }; - let port = port as i64; - - match self - .db - .upsert_node(scheme, host, port, &target_network_str) - .await - { - Ok(_) => { - success_count += 1; - } - Err(e) => { - error_count += 1; - error!( - "Failed to insert configured node {}://{}:{}: {}", - scheme, host, port, e - ); - } - } - } else { - error_count += 1; - error!("Failed to parse node URL: {}", node_url); - } - } - - info!( - "Configured node insertion complete: {} successful, {} errors", - success_count, error_count - ); - Ok(()) - } -} diff --git a/monero-rpc-pool/src/lib.rs b/monero-rpc-pool/src/lib.rs index f03dd1f3..e33cf479 100644 --- a/monero-rpc-pool/src/lib.rs +++ b/monero-rpc-pool/src/lib.rs @@ -6,46 +6,49 @@ use axum::{ Router, }; use monero::Network; -use tokio::sync::RwLock; + use tokio::task::JoinHandle; use tower_http::cors::CorsLayer; use tracing::{error, info}; -fn network_to_string(network: &Network) -> String { - match network { - Network::Mainnet => "mainnet".to_string(), - Network::Stagenet => "stagenet".to_string(), - Network::Testnet => "testnet".to_string(), +pub trait ToNetworkString { + fn to_network_string(&self) -> String; +} + +impl ToNetworkString for Network { + fn to_network_string(&self) -> String { + match self { + Network::Mainnet => "mainnet".to_string(), + Network::Stagenet => "stagenet".to_string(), + Network::Testnet => "testnet".to_string(), + } } } pub mod config; pub mod database; -pub mod discovery; pub mod pool; -pub mod simple_handlers; +pub mod proxy; +pub mod types; use config::Config; use database::Database; -use discovery::NodeDiscovery; use pool::{NodePool, PoolStatus}; -use simple_handlers::{simple_proxy_handler, simple_stats_handler}; +use proxy::{proxy_handler, stats_handler}; #[derive(Clone)] pub struct AppState { - pub node_pool: Arc>, + pub node_pool: Arc, } /// Manages background tasks for the RPC pool pub struct PoolHandle { pub status_update_handle: JoinHandle<()>, - pub discovery_handle: JoinHandle<()>, } impl Drop for PoolHandle { fn drop(&mut self) { self.status_update_handle.abort(); - self.discovery_handle.abort(); } } @@ -65,64 +68,41 @@ async fn create_app_with_receiver( PoolHandle, )> { // Initialize database - let db = Database::new_with_data_dir(config.data_dir.clone()).await?; + let db = Database::new(config.data_dir.clone()).await?; // Initialize node pool with network - let network_str = network_to_string(&network); + let network_str = network.to_network_string(); let (node_pool, status_receiver) = NodePool::new(db.clone(), network_str.clone()); - let node_pool = Arc::new(RwLock::new(node_pool)); - - // Initialize discovery service - let discovery = NodeDiscovery::new(db.clone())?; + let node_pool = Arc::new(node_pool); // Publish initial status immediately to ensure first event is sent - { - let pool_guard = node_pool.read().await; - if let Err(e) = pool_guard.publish_status_update().await { - error!("Failed to publish initial status update: {}", e); - } + if let Err(e) = node_pool.publish_status_update().await { + error!("Failed to publish initial status update: {}", e); } - // Start background tasks + // Send status updates every 10 seconds + let mut interval = tokio::time::interval(std::time::Duration::from_secs(10)); let node_pool_for_health_check = node_pool.clone(); let status_update_handle = tokio::spawn(async move { - let mut interval = tokio::time::interval(std::time::Duration::from_secs(10)); - loop { - interval.tick().await; - - // Publish status update - let pool_guard = node_pool_for_health_check.read().await; - if let Err(e) = pool_guard.publish_status_update().await { + if let Err(e) = node_pool_for_health_check.publish_status_update().await { error!("Failed to publish status update: {}", e); } - } - }); - // Start periodic discovery task - let discovery_clone = discovery.clone(); - let network_clone = network; - let discovery_handle = tokio::spawn(async move { - if let Err(e) = discovery_clone.periodic_discovery_task(network_clone).await { - error!( - "Periodic discovery task failed for network {}: {}", - network_to_string(&network_clone), - e - ); + interval.tick().await; } }); let pool_handle = PoolHandle { status_update_handle, - discovery_handle, }; let app_state = AppState { node_pool }; // Build the app let app = Router::new() - .route("/stats", get(simple_stats_handler)) - .route("/*path", any(simple_proxy_handler)) + .route("/stats", get(stats_handler)) + .route("/*path", any(proxy_handler)) .layer(CorsLayer::permissive()) .with_state(app_state); diff --git a/monero-rpc-pool/src/main.rs b/monero-rpc-pool/src/main.rs index 9326c8bd..2b03fc08 100644 --- a/monero-rpc-pool/src/main.rs +++ b/monero-rpc-pool/src/main.rs @@ -1,10 +1,7 @@ use clap::Parser; -use tracing::{info, warn}; -use tracing_subscriber::{self, EnvFilter}; - -use monero_rpc_pool::database::Database; -use monero_rpc_pool::discovery::NodeDiscovery; use monero_rpc_pool::{config::Config, run_server}; +use tracing::info; +use tracing_subscriber::{self, EnvFilter}; use monero::Network; @@ -20,6 +17,7 @@ fn parse_network(s: &str) -> Result { } } +// TODO: Replace with Display impl for Network fn network_to_string(network: &Network) -> String { match network { Network::Mainnet => "mainnet".to_string(), @@ -41,10 +39,6 @@ struct Args { #[arg(help = "Port to bind the server to")] port: u16, - #[arg(long, value_delimiter = ',')] - #[arg(help = "Comma-separated list of Monero node URLs (overrides network-based discovery)")] - nodes: Option>, - #[arg(short, long, default_value = "mainnet")] #[arg(help = "Network to use for automatic node discovery")] #[arg(value_parser = parse_network)] @@ -55,117 +49,28 @@ struct Args { verbose: bool, } -// Custom filter function that overrides log levels for our crate -fn create_level_override_filter(base_filter: &str) -> EnvFilter { - // Parse the base filter and modify it to treat all monero_rpc_pool logs as trace - let mut filter = EnvFilter::new(base_filter); - - // Add a directive that treats all levels from our crate as trace - filter = filter.add_directive("monero_rpc_pool=trace".parse().unwrap()); - - filter -} - #[tokio::main] async fn main() -> Result<(), Box> { let args = Args::parse(); - // Create a filter that treats all logs from our crate as traces - let base_filter = if args.verbose { - // In verbose mode, show logs from other crates at WARN level - "warn" - } else { - // In normal mode, show logs from other crates at ERROR level - "error" - }; - - let filter = create_level_override_filter(base_filter); - tracing_subscriber::fmt() - .with_env_filter(filter) + .with_env_filter(EnvFilter::new("trace")) .with_target(false) .with_file(true) .with_line_number(true) .init(); - // Store node count for later logging before potentially moving args.nodes - let manual_node_count = args.nodes.as_ref().map(|nodes| nodes.len()); - - // Determine nodes to use and set up discovery - let _nodes = if let Some(manual_nodes) = args.nodes { - info!( - "Using manually specified nodes for network: {}", - network_to_string(&args.network) - ); - - // Insert manual nodes into database with network information - let db = Database::new().await?; - let discovery = NodeDiscovery::new(db.clone())?; - let mut parsed_nodes = Vec::new(); - - for node_url in &manual_nodes { - // Parse the URL to extract components - if let Ok(url) = url::Url::parse(node_url) { - let scheme = url.scheme().to_string(); - let _protocol = if scheme == "https" { "ssl" } else { "tcp" }; - let host = url.host_str().unwrap_or("").to_string(); - let port = url - .port() - .unwrap_or(if scheme == "https" { 443 } else { 80 }) - as i64; - - let full_url = format!("{}://{}:{}", scheme, host, port); - - // Insert into database - if let Err(e) = db - .upsert_node(&scheme, &host, port, &network_to_string(&args.network)) - .await - { - warn!("Failed to insert manual node {}: {}", node_url, e); - } else { - parsed_nodes.push(full_url); - } - } else { - warn!("Failed to parse manual node URL: {}", node_url); - } - } - - // Use manual nodes for discovery - discovery - .discover_and_insert_nodes(args.network, manual_nodes) - .await?; - parsed_nodes - } else { - info!( - "Setting up automatic node discovery for {} network", - network_to_string(&args.network) - ); - let db = Database::new().await?; - let discovery = NodeDiscovery::new(db.clone())?; - - // Start discovery process - discovery.discover_nodes_from_sources(args.network).await?; - Vec::new() // Return empty vec for consistency - }; - let config = Config::new_with_port( args.host, args.port, std::env::temp_dir().join("monero-rpc-pool"), ); - let node_count_msg = if args.verbose { - match manual_node_count { - Some(count) => format!("{} manual nodes configured", count), - None => "using automatic discovery".to_string(), - } - } else { - "configured".to_string() - }; - info!( - "Starting Monero RPC Pool\nConfiguration:\n Host: {}\n Port: {}\n Network: {}\n Nodes: {}", - config.host, config.port, network_to_string(&args.network), node_count_msg + host = config.host, + port = config.port, + network = network_to_string(&args.network), + "Starting Monero RPC Pool" ); if let Err(e) = run_server(config, args.network).await { diff --git a/monero-rpc-pool/src/pool.rs b/monero-rpc-pool/src/pool.rs index a9b02c1d..0dd17f10 100644 --- a/monero-rpc-pool/src/pool.rs +++ b/monero-rpc-pool/src/pool.rs @@ -1,10 +1,10 @@ use anyhow::{Context, Result}; -use rand::prelude::*; use tokio::sync::broadcast; -use tracing::debug; +use tracing::{debug, warn}; use typeshare::typeshare; use crate::database::Database; +use crate::types::NodeAddress; #[derive(Debug, Clone, serde::Serialize)] #[typeshare] @@ -43,64 +43,6 @@ impl NodePool { (pool, status_receiver) } - /// Get next node using Power of Two Choices algorithm - /// Only considers identified nodes (nodes with network set) - pub async fn get_next_node(&self) -> Result> { - let candidate_nodes = self.db.get_identified_nodes(&self.network).await?; - - if candidate_nodes.is_empty() { - debug!("No identified nodes available for network {}", self.network); - return Ok(None); - } - - if candidate_nodes.len() == 1 { - return Ok(Some(candidate_nodes[0].full_url())); - } - - // Power of Two Choices: pick 2 random nodes, select the better one - let mut rng = thread_rng(); - let node1 = candidate_nodes.choose(&mut rng).unwrap(); - let node2 = candidate_nodes.choose(&mut rng).unwrap(); - - let selected = - if self.calculate_goodness_score(node1) >= self.calculate_goodness_score(node2) { - node1 - } else { - node2 - }; - - debug!( - "Selected node using P2C for network {}: {}", - self.network, - selected.full_url() - ); - - Ok(Some(selected.full_url())) - } - - /// Calculate goodness score based on usage-based recency - /// Score is a function of success rate and latency from last N health checks - fn calculate_goodness_score(&self, node: &crate::database::MoneroNode) -> f64 { - let total_checks = node.success_count + node.failure_count; - if total_checks == 0 { - return 0.0; - } - - let success_rate = node.success_count as f64 / total_checks as f64; - - // Weight by recency (more recent interactions = higher weight) - let recency_weight = (total_checks as f64).min(200.0) / 200.0; - let mut score = success_rate * recency_weight; - - // Factor in latency - lower latency = higher score - if let Some(avg_latency) = node.avg_latency_ms { - let latency_factor = 1.0 - (avg_latency.min(2000.0) / 2000.0); - score = score * 0.8 + latency_factor * 0.2; // 80% success rate, 20% latency - } - - score - } - pub async fn record_success( &self, scheme: &str, @@ -123,7 +65,13 @@ impl NodePool { pub async fn publish_status_update(&self) -> Result<()> { let status = self.get_current_status().await?; - let _ = self.status_sender.send(status); // Ignore if no receivers + + if let Err(e) = self.status_sender.send(status.clone()) { + warn!("Failed to send status update: {}", e); + } else { + debug!(?status, "Sent status update"); + } + Ok(()) } @@ -139,7 +87,7 @@ impl NodePool { .map(|node| ReliableNodeInfo { url: node.full_url(), success_rate: node.success_rate(), - avg_latency_ms: node.avg_latency_ms, + avg_latency_ms: node.health.avg_latency_ms, }) .collect(); @@ -152,81 +100,63 @@ impl NodePool { }) } - /// Get top reliable nodes with fill-up logic to ensure pool size - /// First tries to get top nodes based on recent success, then fills up with random nodes - pub async fn get_top_reliable_nodes( - &self, - limit: usize, - ) -> Result> { + /// Get nodes to use, with weighted selection favoring top performers + /// The list has some randomness, but the top nodes are still more likely to be chosen + pub async fn get_top_reliable_nodes(&self, limit: usize) -> Result> { + use rand::seq::SliceRandom; + debug!( "Getting top reliable nodes for network {} (target: {})", self.network, limit ); - // Step 1: Try primary fetch - get top nodes based on recent success (last 200 health checks) - let mut top_nodes = self + let available_nodes = self .db - .get_top_nodes_by_recent_success(&self.network, 200, limit as i64) + .get_top_nodes_by_recent_success(&self.network, limit as i64) .await .context("Failed to get top nodes by recent success")?; + let total_candidates = available_nodes.len(); + + let weighted: Vec<(NodeAddress, f64)> = available_nodes + .into_iter() + .enumerate() + .map(|(idx, node)| { + // Higher-ranked (smaller idx) ⇒ larger weight + let weight = 1.5_f64.powi((total_candidates - idx) as i32); + (node, weight) + }) + .collect(); + + let mut rng = rand::thread_rng(); + + let mut candidates = weighted; + let mut selected_nodes = Vec::with_capacity(limit); + + while selected_nodes.len() < limit && !candidates.is_empty() { + // Choose one node based on its weight using `choose_weighted` + let chosen_pair = candidates + .choose_weighted(&mut rng, |item| item.1) + .map_err(|e| anyhow::anyhow!("Weighted choice failed: {}", e))?; + + // Locate index of the chosen pair and remove it + let chosen_index = candidates + .iter() + .position(|x| std::ptr::eq(x, chosen_pair)) + .expect("Chosen item must exist in candidates"); + + let (node, _) = candidates.swap_remove(chosen_index); + selected_nodes.push(node); + } + debug!( - "Primary fetch returned {} nodes for network {} (target: {})", - top_nodes.len(), + "Pool size: {} nodes for network {} (target: {})", + selected_nodes.len(), self.network, limit ); - // Step 2: If primary fetch didn't return enough nodes, fall back to any identified nodes with successful health checks - if top_nodes.len() < limit { - debug!("Primary fetch returned insufficient nodes, falling back to any identified nodes with successful health checks"); - top_nodes = self - .db - .get_identified_nodes_with_success(&self.network) - .await?; - - debug!( - "Fallback fetch returned {} nodes with successful health checks for network {}", - top_nodes.len(), - self.network - ); - } - - // Step 3: Check if we still don't have enough nodes - if top_nodes.len() < limit { - let needed = limit - top_nodes.len(); - debug!( - "Pool needs {} more nodes to reach target of {} for network {}", - needed, limit, self.network - ); - - // Step 4: Collect exclusion IDs from nodes already selected - let exclude_ids: Vec = top_nodes.iter().filter_map(|node| node.id).collect(); - - // Step 5: Secondary fetch - get random nodes to fill up - let random_fillers = self - .db - .get_random_nodes(&self.network, needed as i64, &exclude_ids) - .await?; - - debug!( - "Secondary fetch returned {} random nodes for network {}", - random_fillers.len(), - self.network - ); - - // Step 6: Combine lists - top_nodes.extend(random_fillers); - } - - debug!( - "Final pool size: {} nodes for network {} (target: {})", - top_nodes.len(), - self.network, - limit - ); - - Ok(top_nodes) + Ok(selected_nodes) } pub async fn get_pool_stats(&self) -> Result { @@ -238,11 +168,11 @@ impl NodePool { } else { let total_latency: f64 = reliable_nodes .iter() - .filter_map(|node| node.avg_latency_ms) + .filter_map(|node| node.health.avg_latency_ms) .sum(); let count = reliable_nodes .iter() - .filter(|node| node.avg_latency_ms.is_some()) + .filter(|node| node.health.avg_latency_ms.is_some()) .count(); if count > 0 { diff --git a/monero-rpc-pool/src/simple_handlers.rs b/monero-rpc-pool/src/proxy.rs similarity index 73% rename from monero-rpc-pool/src/simple_handlers.rs rename to monero-rpc-pool/src/proxy.rs index ff99ef07..f3962b63 100644 --- a/monero-rpc-pool/src/simple_handlers.rs +++ b/monero-rpc-pool/src/proxy.rs @@ -16,7 +16,7 @@ enum HandlerError { NoNodes, PoolError(String), RequestError(String), - AllRequestsFailed(Vec<(String, String)>), // Vec of (node_url, error_message) + AllRequestsFailed(Vec<(String, String)>), } impl std::fmt::Display for HandlerError { @@ -150,8 +150,8 @@ async fn raw_http_request( } async fn record_success(state: &AppState, scheme: &str, host: &str, port: i64, latency_ms: f64) { - let node_pool_guard = state.node_pool.read().await; - if let Err(e) = node_pool_guard + if let Err(e) = state + .node_pool .record_success(scheme, host, port, latency_ms) .await { @@ -163,8 +163,7 @@ async fn record_success(state: &AppState, scheme: &str, host: &str, port: i64, l } async fn record_failure(state: &AppState, scheme: &str, host: &str, port: i64) { - let node_pool_guard = state.node_pool.read().await; - if let Err(e) = node_pool_guard.record_failure(scheme, host, port).await { + if let Err(e) = state.node_pool.record_failure(scheme, host, port).await { error!( "Failed to record failure for {}://{}:{}: {}", scheme, host, port, e @@ -173,15 +172,12 @@ async fn record_failure(state: &AppState, scheme: &str, host: &str, port: i64) { } async fn single_raw_request( - state: &AppState, node_url: (String, String, i64), path: &str, method: &str, headers: &HeaderMap, body: Option<&[u8]>, ) -> Result<(Response, (String, String, i64), f64), HandlerError> { - let (scheme, host, port) = &node_url; - let start_time = Instant::now(); match raw_http_request(node_url.clone(), path, method, headers, body).await { @@ -199,42 +195,37 @@ async fn single_raw_request( .map_err(|e| HandlerError::RequestError(format!("{:#?}", e)))?; if is_jsonrpc_error(&body_bytes) { - record_failure(state, scheme, host, *port).await; return Err(HandlerError::RequestError("JSON-RPC error".to_string())); } // Reconstruct response with the body we consumed let response = Response::from_parts(parts, Body::from(body_bytes)); - record_success(state, scheme, host, *port, latency_ms).await; Ok((response, node_url, latency_ms)) } else { // For non-JSON-RPC endpoints, HTTP success is enough - record_success(state, scheme, host, *port, latency_ms).await; Ok((response, node_url, latency_ms)) } } else { // Non-200 status codes are failures - record_failure(state, scheme, host, *port).await; Err(HandlerError::RequestError(format!( "HTTP {}", response.status() ))) } } - Err(e) => { - record_failure(state, scheme, host, *port).await; - Err(e) - } + Err(e) => Err(e), } } -async fn race_requests( +async fn sequential_requests( state: &AppState, path: &str, method: &str, headers: &HeaderMap, body: Option<&[u8]>, ) -> Result { + const POOL_SIZE: usize = 20; + // Extract JSON-RPC method for better logging let jsonrpc_method = if path == "/json_rpc" { if let Some(body_data) = body { @@ -245,22 +236,21 @@ async fn race_requests( } else { None }; - const POOL_SIZE: usize = 20; - let mut tried_nodes = std::collections::HashSet::new(); - let mut pool_index = 0; + + let mut tried_nodes = 0; let mut collected_errors: Vec<(String, String)> = Vec::new(); - // Get the exclusive pool of 20 nodes once at the beginning + // Get the pool of nodes let available_pool = { - let node_pool_guard = state.node_pool.read().await; - let reliable_nodes = node_pool_guard + let nodes = state + .node_pool .get_top_reliable_nodes(POOL_SIZE) .await .map_err(|e| HandlerError::PoolError(e.to_string()))?; - let pool: Vec<(String, String, i64)> = reliable_nodes + let pool: Vec<(String, String, i64)> = nodes .into_iter() - .map(|node| (node.scheme, node.host, node.port)) + .map(|node| (node.scheme, node.host, node.port as i64)) .collect(); pool @@ -270,142 +260,59 @@ async fn race_requests( return Err(HandlerError::NoNodes); } - // Power of Two Choices within the exclusive pool - while pool_index < available_pool.len() && tried_nodes.len() < POOL_SIZE { - let mut node1_option = None; - let mut node2_option = None; - - // Select first untried node from pool - for (i, node) in available_pool.iter().enumerate().skip(pool_index) { - if !tried_nodes.contains(node) { - node1_option = Some(node.clone()); - pool_index = i + 1; - break; - } - } - - // Select second untried node from pool (different from first) - for node in available_pool.iter().skip(pool_index) { - if !tried_nodes.contains(node) && Some(node) != node1_option.as_ref() { - node2_option = Some(node.clone()); - break; - } - } - - // If we can't get any new nodes from the pool, we've exhausted our options - if node1_option.is_none() && node2_option.is_none() { - break; - } - - // Store node URLs for error tracking before consuming them - let current_nodes: Vec<(String, String, i64)> = [&node1_option, &node2_option] - .iter() - .filter_map(|opt| opt.as_ref()) - .cloned() - .collect(); - - let mut requests = Vec::new(); - - if let Some(node1) = node1_option { - tried_nodes.insert(node1.clone()); - requests.push(single_raw_request( - state, - node1.clone(), - path, - method, - headers, - body, - )); - } - - if let Some(node2) = node2_option { - tried_nodes.insert(node2.clone()); - requests.push(single_raw_request( - state, - node2.clone(), - path, - method, - headers, - body, - )); - } - - if requests.is_empty() { - break; - } + // Try nodes one by one sequentially + for node in available_pool.iter().take(POOL_SIZE) { + tried_nodes += 1; + let node_display = format!("{}://{}:{}", node.0, node.1, node.2); match &jsonrpc_method { Some(rpc_method) => debug!( - "Racing {} requests to {} (JSON-RPC: {}): {} nodes (tried {} so far)", + "Trying {} request to {} (JSON-RPC: {}) - attempt {} of {}", method, - path, + node_display, rpc_method, - requests.len(), - tried_nodes.len() + tried_nodes, + available_pool.len().min(POOL_SIZE) ), None => debug!( - "Racing {} requests to {}: {} nodes (tried {} so far)", + "Trying {} request to {} - attempt {} of {}", method, - path, - requests.len(), - tried_nodes.len() + node_display, + tried_nodes, + available_pool.len().min(POOL_SIZE) ), } - // Handle the requests based on how many we have - let result = match requests.len() { - 1 => { - // Only one request - requests.into_iter().next().unwrap().await - } - 2 => { - // Two requests - race them - let mut iter = requests.into_iter(); - let req1 = iter.next().unwrap(); - let req2 = iter.next().unwrap(); - - tokio::select! { - result1 = req1 => result1, - result2 = req2 => result2, - } - } - _ => unreachable!("We only add 1 or 2 requests"), - }; - - match result { + match single_raw_request(node.clone(), path, method, headers, body).await { Ok((response, winning_node, latency_ms)) => { let (scheme, host, port) = &winning_node; - let winning_node = format!("{}://{}:{}", scheme, host, port); + let winning_node_display = format!("{}://{}:{}", scheme, host, port); match &jsonrpc_method { - Some(rpc_method) => { - debug!( + Some(rpc_method) => debug!( "{} response from {} ({}ms) - SUCCESS after trying {} nodes! JSON-RPC: {}", - method, winning_node, latency_ms, tried_nodes.len(), rpc_method - ) - } + method, winning_node_display, latency_ms, tried_nodes, rpc_method + ), None => debug!( "{} response from {} ({}ms) - SUCCESS after trying {} nodes!", - method, - winning_node, - latency_ms, - tried_nodes.len() + method, winning_node_display, latency_ms, tried_nodes ), } - record_success(state, scheme, host, *port, latency_ms).await; + + record_success(state, &node.0, &node.1, node.2, latency_ms).await; + return Ok(response); } Err(e) => { - // Since we don't know which specific node failed in the race, - // record the error for all nodes in this batch - for (scheme, host, port) in ¤t_nodes { - let node_display = format!("{}://{}:{}", scheme, host, port); - collected_errors.push((node_display, e.to_string())); - } + collected_errors.push((node_display.clone(), e.to_string())); + debug!( - "Request failed: {} - retrying with different nodes from pool...", - e + "Request failed with node {} with error {} - trying next node...", + node_display, e ); + + record_failure(state, &node.0, &node.1, node.2).await; + continue; } } @@ -421,14 +328,14 @@ async fn race_requests( Some(rpc_method) => error!( "All {} requests failed after trying {} nodes (JSON-RPC: {}). Detailed errors:\n{}", method, - tried_nodes.len(), + tried_nodes, rpc_method, detailed_errors.join("\n") ), None => error!( "All {} requests failed after trying {} nodes. Detailed errors:\n{}", method, - tried_nodes.len(), + tried_nodes, detailed_errors.join("\n") ), } @@ -446,7 +353,7 @@ async fn proxy_request( headers: &HeaderMap, body: Option<&[u8]>, ) -> Response { - match race_requests(state, path, method, headers, body).await { + match sequential_requests(state, path, method, headers, body).await { Ok(res) => res, Err(handler_error) => { let error_response = match &handler_error { @@ -505,7 +412,7 @@ async fn proxy_request( } #[axum::debug_handler] -pub async fn simple_proxy_handler( +pub async fn proxy_handler( State(state): State, method: Method, uri: axum::http::Uri, @@ -553,11 +460,9 @@ pub async fn simple_proxy_handler( } #[axum::debug_handler] -pub async fn simple_stats_handler(State(state): State) -> Response { +pub async fn stats_handler(State(state): State) -> Response { async move { - let node_pool_guard = state.node_pool.read().await; - - match node_pool_guard.get_current_status().await { + match state.node_pool.get_current_status().await { Ok(status) => { let stats_json = serde_json::json!({ "status": "healthy", diff --git a/monero-rpc-pool/src/types.rs b/monero-rpc-pool/src/types.rs new file mode 100644 index 00000000..df536d16 --- /dev/null +++ b/monero-rpc-pool/src/types.rs @@ -0,0 +1,119 @@ +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use std::fmt; + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct NodeAddress { + pub scheme: String, // "http" or "https" + pub host: String, + pub port: u16, +} + +impl NodeAddress { + pub fn new(scheme: String, host: String, port: u16) -> Self { + Self { scheme, host, port } + } + + pub fn full_url(&self) -> String { + format!("{}://{}:{}", self.scheme, self.host, self.port) + } +} + +impl fmt::Display for NodeAddress { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}://{}:{}", self.scheme, self.host, self.port) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct NodeMetadata { + pub id: i64, + pub network: String, // "mainnet", "stagenet", or "testnet" + pub first_seen_at: DateTime, +} + +impl NodeMetadata { + pub fn new(id: i64, network: String, first_seen_at: DateTime) -> Self { + Self { + id, + network, + first_seen_at, + } + } +} + +/// Health check statistics for a node +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct NodeHealthStats { + pub success_count: i64, + pub failure_count: i64, + pub last_success: Option>, + pub last_failure: Option>, + pub last_checked: Option>, + pub avg_latency_ms: Option, + pub min_latency_ms: Option, + pub max_latency_ms: Option, + pub last_latency_ms: Option, +} + +impl NodeHealthStats { + pub fn success_rate(&self) -> f64 { + let total = self.success_count + self.failure_count; + if total == 0 { + 0.0 + } else { + self.success_count as f64 / total as f64 + } + } + + pub fn reliability_score(&self) -> f64 { + let success_rate = self.success_rate(); + let total_requests = self.success_count + self.failure_count; + + // Weight success rate by total requests (more requests = more reliable data) + let request_weight = (total_requests as f64).min(200.0) / 200.0; + let mut score = success_rate * request_weight; + + // Factor in latency - lower latency = higher score + if let Some(avg_latency) = self.avg_latency_ms { + // Normalize latency to 0-1 range (assuming 0-2000ms range) + let latency_factor = 1.0 - (avg_latency.min(2000.0) / 2000.0); + score = score * 0.8 + latency_factor * 0.2; // 80% success rate, 20% latency + } + + score + } +} + +/// A complete node record combining address, metadata, and health stats +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct NodeRecord { + #[serde(flatten)] + pub address: NodeAddress, + #[serde(flatten)] + pub metadata: NodeMetadata, + #[serde(flatten)] + pub health: NodeHealthStats, +} + +impl NodeRecord { + pub fn new(address: NodeAddress, metadata: NodeMetadata, health: NodeHealthStats) -> Self { + Self { + address, + metadata, + health, + } + } + + pub fn full_url(&self) -> String { + self.address.full_url() + } + + pub fn success_rate(&self) -> f64 { + self.health.success_rate() + } + + pub fn reliability_score(&self) -> f64 { + self.health.reliability_score() + } +} diff --git a/swap/src/cli/api.rs b/swap/src/cli/api.rs index bcbed16f..744cb9f6 100644 --- a/swap/src/cli/api.rs +++ b/swap/src/cli/api.rs @@ -7,7 +7,6 @@ use crate::common::tracing_util::Format; use crate::database::{open_db, AccessMode}; use crate::env::{Config as EnvConfig, GetConfig, Mainnet, Testnet}; use crate::fs::system_data_dir; -use crate::monero::wallet_rpc; use crate::monero::Wallets; use crate::network::rendezvous::XmrBtcNamespace; use crate::protocol::Database; @@ -376,48 +375,41 @@ impl ContextBuilder { (), ); - // Handle the different monero configurations + // If we are instructed to use a pool, we start it and use it + // Otherwise we use the single node address provided by the user let (monero_node_address, rpc_pool_handle) = match monero_config { MoneroNodeConfig::Pool => { // Start RPC pool and use it - match monero_rpc_pool::start_server_with_random_port( - monero_rpc_pool::config::Config::new_random_port( - "127.0.0.1".to_string(), - data_dir.join("monero-rpc-pool"), - ), - match self.is_testnet { - true => crate::monero::Network::Stagenet, - false => crate::monero::Network::Mainnet, - }, - ) - .await - { - Ok((server_info, mut status_receiver, pool_handle)) => { - let rpc_url = - format!("http://{}:{}", server_info.host, server_info.port); - tracing::info!("Monero RPC Pool started on {}", rpc_url); + let (server_info, mut status_receiver, pool_handle) = + monero_rpc_pool::start_server_with_random_port( + monero_rpc_pool::config::Config::new_random_port( + "127.0.0.1".to_string(), + data_dir.join("monero-rpc-pool"), + ), + match self.is_testnet { + true => crate::monero::Network::Stagenet, + false => crate::monero::Network::Mainnet, + }, + ) + .await?; - // Start listening for pool status updates and forward them to frontend - if let Some(ref handle) = self.tauri_handle { - let pool_tauri_handle = handle.clone(); - tokio::spawn(async move { - while let Ok(status) = status_receiver.recv().await { - pool_tauri_handle.emit_pool_status_update(status); - } - }); + let rpc_url = + format!("http://{}:{}", server_info.host, server_info.port); + tracing::info!("Monero RPC Pool started on {}", rpc_url); + + // Start listening for pool status updates and forward them to frontend + if let Some(ref handle) = self.tauri_handle { + let pool_tauri_handle = handle.clone(); + tokio::spawn(async move { + while let Ok(status) = status_receiver.recv().await { + pool_tauri_handle.emit_pool_status_update(status); } - - (Some(rpc_url), Some(Arc::new(pool_handle))) - } - Err(e) => { - tracing::error!("Failed to start Monero RPC Pool: {}", e); - (None, None) - } + }); } + + (rpc_url, Some(Arc::new(pool_handle))) } - MoneroNodeConfig::SingleNode { url } => { - (if url.is_empty() { None } else { Some(url) }, None) - } + MoneroNodeConfig::SingleNode { url } => (url, None), }; let wallets = init_monero_wallet( @@ -583,27 +575,16 @@ async fn init_bitcoin_wallet( async fn init_monero_wallet( data_dir: &Path, - monero_daemon_address: impl Into>, + monero_daemon_address: String, env_config: EnvConfig, tauri_handle: Option, ) -> Result> { let network = env_config.monero_network; - - // Use the ./monero/monero-data directory for backwards compatibility let wallet_dir = data_dir.join("monero").join("monero-data"); - let daemon = if let Some(addr) = monero_daemon_address.into() { - monero_sys::Daemon { - address: addr, - ssl: false, - } - } else { - let node = wallet_rpc::choose_monero_node(env_config.monero_network).await?; - tracing::debug!(%node, "Automatically selected monero node"); - monero_sys::Daemon { - address: node.to_string(), - ssl: false, - } + let daemon = monero_sys::Daemon { + address: monero_daemon_address, + ssl: false, }; // This is the name of a wallet we only use for blockchain monitoring