fix(monero-rpc-pool): Keep tasks alive, display retry rate last 200 ops (#423)

* fix(monero-rpc-pool): Keep background tasks alive, display retry rate last 200 ops

* refactors

* if moneor node is offline, use pool

* refactors
This commit is contained in:
Mohan 2025-06-19 23:08:50 +02:00 committed by GitHub
parent 3cb2d907f9
commit b72925ca18
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 147 additions and 116 deletions

View file

@ -1,6 +1,6 @@
{
"db_name": "SQLite",
"query": "\n SELECT \n CAST(SUM(CASE WHEN hc.was_successful THEN 1 ELSE 0 END) AS INTEGER) as \"successful!: i64\",\n CAST(SUM(CASE WHEN NOT hc.was_successful THEN 1 ELSE 0 END) AS INTEGER) as \"unsuccessful!: i64\"\n FROM health_checks hc\n JOIN monero_nodes n ON hc.node_id = n.id\n WHERE n.network = ?\n ",
"query": "\n SELECT \n CAST(SUM(CASE WHEN hc.was_successful THEN 1 ELSE 0 END) AS INTEGER) as \"successful!: i64\",\n CAST(SUM(CASE WHEN NOT hc.was_successful THEN 1 ELSE 0 END) AS INTEGER) as \"unsuccessful!: i64\"\n FROM (\n SELECT hc.was_successful\n FROM health_checks hc\n JOIN monero_nodes n ON hc.node_id = n.id\n WHERE n.network = ?\n ORDER BY hc.timestamp DESC\n LIMIT 100\n ) hc\n ",
"describe": {
"columns": [
{
@ -19,5 +19,5 @@
},
"nullable": [true, true]
},
"hash": "132666c849bf0db14e50ef41f429e17b7c1afd21031edf3af40fadfb79ef2597"
"hash": "d32d91ca2debc4212841282533482b2ff081234c7f9f848a7223ae04234995d9"
}

View file

@ -481,9 +481,14 @@ impl Database {
SELECT
CAST(SUM(CASE WHEN hc.was_successful THEN 1 ELSE 0 END) AS INTEGER) as "successful!: i64",
CAST(SUM(CASE WHEN NOT hc.was_successful THEN 1 ELSE 0 END) AS INTEGER) as "unsuccessful!: i64"
FROM health_checks hc
JOIN monero_nodes n ON hc.node_id = n.id
WHERE n.network = ?
FROM (
SELECT hc.was_successful
FROM health_checks hc
JOIN monero_nodes n ON hc.node_id = n.id
WHERE n.network = ?
ORDER BY hc.timestamp DESC
LIMIT 100
) hc
"#,
network
)

View file

@ -37,12 +37,12 @@ pub struct AppState {
}
/// Manages background tasks for the RPC pool
pub struct TaskManager {
pub struct PoolHandle {
pub status_update_handle: JoinHandle<()>,
pub discovery_handle: JoinHandle<()>,
}
impl Drop for TaskManager {
impl Drop for PoolHandle {
fn drop(&mut self) {
self.status_update_handle.abort();
self.discovery_handle.abort();
@ -62,7 +62,7 @@ async fn create_app_with_receiver(
) -> Result<(
Router,
tokio::sync::broadcast::Receiver<PoolStatus>,
TaskManager,
PoolHandle,
)> {
// Initialize database
let db = Database::new_with_data_dir(config.data_dir.clone()).await?;
@ -75,17 +75,27 @@ async fn create_app_with_receiver(
// Initialize discovery service
let discovery = NodeDiscovery::new(db.clone())?;
// Publish initial status immediately to ensure first event is sent
{
let pool_guard = node_pool.read().await;
if let Err(e) = pool_guard.publish_status_update().await {
error!("Failed to publish initial status update: {}", e);
}
}
// Start background tasks
let node_pool_for_health_check = node_pool.clone();
let status_update_handle = tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(10));
loop {
// Publish status update after health check
interval.tick().await;
// Publish status update
let pool_guard = node_pool_for_health_check.read().await;
if let Err(e) = pool_guard.publish_status_update().await {
error!("Failed to publish status update after health check: {}", e);
error!("Failed to publish status update: {}", e);
}
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
}
});
@ -102,7 +112,7 @@ async fn create_app_with_receiver(
}
});
let task_manager = TaskManager {
let pool_handle = PoolHandle {
status_update_handle,
discovery_handle,
};
@ -116,12 +126,12 @@ async fn create_app_with_receiver(
.layer(CorsLayer::permissive())
.with_state(app_state);
Ok((app, status_receiver, task_manager))
Ok((app, status_receiver, pool_handle))
}
pub async fn create_app(config: Config, network: Network) -> Result<Router> {
let (app, _, _task_manager) = create_app_with_receiver(config, network).await?;
// Note: task_manager is dropped here, so tasks will be aborted when this function returns
let (app, _, _pool_handle) = create_app_with_receiver(config, network).await?;
// Note: pool_handle is dropped here, so tasks will be aborted when this function returns
// This is intentional for the simple create_app use case
Ok(app)
}
@ -160,14 +170,14 @@ pub async fn run_server_with_data_dir(
}
/// Start a server with a random port for library usage
/// Returns the server info with the actual port used, a receiver for pool status updates, and task manager
/// Returns the server info with the actual port used, a receiver for pool status updates, and pool handle
pub async fn start_server_with_random_port(
config: Config,
network: Network,
) -> Result<(
ServerInfo,
tokio::sync::broadcast::Receiver<PoolStatus>,
TaskManager,
PoolHandle,
)> {
// Clone the host before moving config
let host = config.host.clone();
@ -175,7 +185,7 @@ pub async fn start_server_with_random_port(
// If port is 0, the system will assign a random available port
let config_with_random_port = Config::new_random_port(config.host, config.data_dir);
let (app, status_receiver, task_manager) =
let (app, status_receiver, pool_handle) =
create_app_with_receiver(config_with_random_port, network).await?;
// Bind to port 0 to get a random available port
@ -199,11 +209,11 @@ pub async fn start_server_with_random_port(
}
});
Ok((server_info, status_receiver, task_manager))
Ok((server_info, status_receiver, pool_handle))
}
/// Start a server with a random port and custom data directory for library usage
/// Returns the server info with the actual port used, a receiver for pool status updates, and task manager
/// Returns the server info with the actual port used, a receiver for pool status updates, and pool handle
pub async fn start_server_with_random_port_and_data_dir(
config: Config,
network: Network,
@ -211,7 +221,7 @@ pub async fn start_server_with_random_port_and_data_dir(
) -> Result<(
ServerInfo,
tokio::sync::broadcast::Receiver<PoolStatus>,
TaskManager,
PoolHandle,
)> {
let config_with_data_dir = Config::new_random_port(config.host, data_dir);
start_server_with_random_port(config_with_data_dir, network).await

View file

@ -64,7 +64,7 @@ export default function MoneroPoolHealthBox() {
size="small"
/>
<Chip
label={`${(100 - overallSuccessRate).toFixed(1)}% Retry Rate`}
label={`${(100 - overallSuccessRate).toFixed(1)}% Retry Rate (last 200 operations)`}
color={
overallSuccessRate > 80
? "success"
@ -182,25 +182,6 @@ export default function MoneroPoolHealthBox() {
additionalContent={
<Box sx={{ display: "flex", flexDirection: "column", gap: 2 }}>
{poolStatus && renderHealthSummary()}
{poolStatus && (
<Box>
<Typography variant="body2" sx={{ mb: 1, fontWeight: "medium" }}>
Health Check Statistics
</Typography>
<Box sx={{ display: "flex", gap: 2, flexWrap: "wrap" }}>
<Typography variant="caption" color="text.secondary">
Successful:{" "}
{poolStatus.successful_health_checks.toLocaleString()}
</Typography>
<Typography variant="caption" color="text.secondary">
Failed:{" "}
{poolStatus.unsuccessful_health_checks.toLocaleString()}
</Typography>
</Box>
</Box>
)}
<Box>{renderTopNodes()}</Box>
</Box>
}

View file

@ -223,21 +223,30 @@ export async function initializeContext() {
const bitcoinNodes =
store.getState().settings.nodes[network][Blockchain.Bitcoin];
// For Monero nodes, get the configured node URL and pool setting
// For Monero nodes, determine whether to use pool or custom node
const useMoneroRpcPool = store.getState().settings.useMoneroRpcPool;
const moneroNodes =
store.getState().settings.nodes[network][Blockchain.Monero];
// Always pass the first configured monero node URL directly without checking availability
// The backend will handle whether to use the pool or the custom node
const moneroNode = moneroNodes.length > 0 ? moneroNodes[0] : null;
const moneroNodeUrl =
store.getState().settings.nodes[network][Blockchain.Monero][0] ?? null;
// Check the state of the Monero node
const isMoneroNodeOnline = await getMoneroNodeStatus(moneroNodeUrl, network);
const moneroNodeConfig =
useMoneroRpcPool || moneroNodeUrl == null || !isMoneroNodeOnline
? { type: "Pool" as const }
: {
type: "SingleNode" as const,
content: {
url: moneroNodeUrl,
},
};
// Initialize Tauri settings
const tauriSettings: TauriSettings = {
electrum_rpc_urls: bitcoinNodes,
monero_node_url: moneroNode,
monero_node_config: moneroNodeConfig,
use_tor: useTor,
use_monero_rpc_pool: useMoneroRpcPool,
};
logger.info("Initializing context with settings", tauriSettings);

View file

@ -17,7 +17,7 @@ use swap::cli::{
tauri_bindings::{TauriContextStatusEvent, TauriEmitter, TauriHandle, TauriSettings},
Context, ContextBuilder,
},
command::{Bitcoin, Monero},
command::Bitcoin,
};
use tauri::{async_runtime::RwLock, Manager, RunEvent};
use tauri_plugin_dialog::DialogExt;
@ -367,48 +367,6 @@ async fn initialize_context(
.context("Context is already being initialized")
.to_string_result()?;
// Determine which Monero node to use:
// - If using RPC pool, start and use the local RPC pool
// - Otherwise, use the provided node URL directly (even if empty)
let monero_node_url = if settings.use_monero_rpc_pool {
// Start RPC pool and use it
let data_dir = data::data_dir_from(None, testnet).to_string_result()?;
match monero_rpc_pool::start_server_with_random_port(
monero_rpc_pool::config::Config::new_random_port(
"127.0.0.1".to_string(),
data_dir.join("monero-rpc-pool"),
),
match testnet {
true => swap::monero::Network::Stagenet,
false => swap::monero::Network::Mainnet,
},
)
.await
{
Ok((server_info, mut status_receiver, _task_manager)) => {
let rpc_url = format!("http://{}:{}", server_info.host, server_info.port);
tracing::info!("Monero RPC Pool started on {}", rpc_url);
// Start listening for pool status updates and forward them to frontend
let pool_tauri_handle = TauriHandle::new(app_handle.clone());
tauri::async_runtime::spawn(async move {
while let Ok(status) = status_receiver.recv().await {
pool_tauri_handle.emit_pool_status_update(status);
}
});
rpc_url.parse().ok()
}
Err(e) => {
tracing::error!("Failed to start Monero RPC Pool: {}", e);
None
}
}
} else {
// Use the provided node URL directly without checking availability
settings.monero_node_url.clone()
};
// Get app handle and create a Tauri handle
let tauri_handle = TauriHandle::new(app_handle.clone());
@ -420,9 +378,7 @@ async fn initialize_context(
bitcoin_electrum_rpc_urls: settings.electrum_rpc_urls.clone(),
bitcoin_target_block: None,
})
.with_monero(Monero {
monero_node_address: monero_node_url,
})
.with_monero(settings.monero_node_config)
.with_json(false)
.with_debug(true)
.with_tor(settings.use_tor)

View file

@ -483,7 +483,7 @@ async fn init_monero_wallet(
// Start the monero-rpc-pool and use it
tracing::info!("Starting Monero RPC Pool for ASB");
let (server_info, _status_receiver, _task_manager) =
let (server_info, _status_receiver, _pool_handle) =
monero_rpc_pool::start_server_with_random_port(
monero_rpc_pool::config::Config::new_random_port(
"127.0.0.1".to_string(),

View file

@ -20,7 +20,9 @@ use std::fmt;
use std::future::Future;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Once};
use tauri_bindings::{TauriBackgroundProgress, TauriContextStatusEvent, TauriEmitter, TauriHandle};
use tauri_bindings::{
MoneroNodeConfig, TauriBackgroundProgress, TauriContextStatusEvent, TauriEmitter, TauriHandle,
};
use tokio::sync::{broadcast, broadcast::Sender, Mutex as TokioMutex, RwLock};
use tokio::task::JoinHandle;
use tor_rtcompat::tokio::TokioRustlsRuntime;
@ -188,12 +190,13 @@ pub struct Context {
bitcoin_wallet: Option<Arc<bitcoin::Wallet>>,
monero_manager: Option<Arc<monero::Wallets>>,
tor_client: Option<Arc<TorClient<TokioRustlsRuntime>>>,
monero_rpc_pool_handle: Option<Arc<monero_rpc_pool::PoolHandle>>,
}
/// A conveniant builder struct for [`Context`].
#[must_use = "ContextBuilder must be built to be useful"]
pub struct ContextBuilder {
monero: Option<Monero>,
monero_config: Option<MoneroNodeConfig>,
bitcoin: Option<Bitcoin>,
data: Option<PathBuf>,
is_testnet: bool,
@ -216,7 +219,7 @@ impl ContextBuilder {
/// Basic builder with default options for mainnet
pub fn mainnet() -> Self {
ContextBuilder {
monero: None,
monero_config: None,
bitcoin: None,
data: None,
is_testnet: false,
@ -235,8 +238,8 @@ impl ContextBuilder {
}
/// Configures the Context to initialize a Monero wallet with the given configuration.
pub fn with_monero(mut self, monero: impl Into<Option<Monero>>) -> Self {
self.monero = monero.into();
pub fn with_monero(mut self, monero_config: impl Into<Option<MoneroNodeConfig>>) -> Self {
self.monero_config = monero_config.into();
self
}
@ -247,8 +250,8 @@ impl ContextBuilder {
}
/// Attach a handle to Tauri to the Context for emitting events etc.
pub fn with_tauri(mut self, tauri: impl Into<Option<TauriHandle>>) -> Self {
self.tauri_handle = tauri.into();
pub fn with_tauri(mut self, tauri_handle: impl Into<Option<TauriHandle>>) -> Self {
self.tauri_handle = tauri_handle.into();
self
}
@ -364,17 +367,61 @@ impl ContextBuilder {
};
let initialize_monero_wallet = async {
match self.monero {
Some(monero) => {
match self.monero_config {
Some(monero_config) => {
let monero_progress_handle = tauri_handle
.new_background_process_with_initial_progress(
TauriBackgroundProgress::OpeningMoneroWallet,
(),
);
// Handle the different monero configurations
let (monero_node_address, rpc_pool_handle) = match monero_config {
MoneroNodeConfig::Pool => {
// Start RPC pool and use it
match monero_rpc_pool::start_server_with_random_port(
monero_rpc_pool::config::Config::new_random_port(
"127.0.0.1".to_string(),
data_dir.join("monero-rpc-pool"),
),
match self.is_testnet {
true => crate::monero::Network::Stagenet,
false => crate::monero::Network::Mainnet,
},
)
.await
{
Ok((server_info, mut status_receiver, pool_handle)) => {
let rpc_url =
format!("http://{}:{}", server_info.host, server_info.port);
tracing::info!("Monero RPC Pool started on {}", rpc_url);
// Start listening for pool status updates and forward them to frontend
if let Some(ref handle) = self.tauri_handle {
let pool_tauri_handle = handle.clone();
tokio::spawn(async move {
while let Ok(status) = status_receiver.recv().await {
pool_tauri_handle.emit_pool_status_update(status);
}
});
}
(Some(rpc_url), Some(Arc::new(pool_handle)))
}
Err(e) => {
tracing::error!("Failed to start Monero RPC Pool: {}", e);
(None, None)
}
}
}
MoneroNodeConfig::SingleNode { url } => {
(if url.is_empty() { None } else { Some(url) }, None)
}
};
let wallets = init_monero_wallet(
data_dir.as_path(),
monero.monero_node_address.map(|url| url.to_string()),
monero_node_address,
env_config,
tauri_handle.clone(),
)
@ -382,9 +429,9 @@ impl ContextBuilder {
monero_progress_handle.finish();
Ok(Some(wallets))
Ok((Some(wallets), rpc_pool_handle))
}
None => Ok(None),
None => Ok((None, None)),
}
};
@ -405,7 +452,7 @@ impl ContextBuilder {
Ok(maybe_tor_client)
};
let (bitcoin_wallet, monero_manager, tor) = tokio::try_join!(
let (bitcoin_wallet, (monero_manager, monero_rpc_pool_handle), tor) = tokio::try_join!(
initialize_bitcoin_wallet,
initialize_monero_wallet,
initialize_tor_client,
@ -443,6 +490,7 @@ impl ContextBuilder {
tasks,
tauri_handle: self.tauri_handle,
tor_client: tor,
monero_rpc_pool_handle,
};
Ok(context)
@ -476,6 +524,7 @@ impl Context {
tasks: PendingTaskList::default().into(),
tauri_handle: None,
tor_client: None,
monero_rpc_pool_handle: None,
}
}
@ -637,6 +686,23 @@ impl Config {
}
}
impl From<Monero> for MoneroNodeConfig {
fn from(monero: Monero) -> Self {
match monero.monero_node_address {
Some(url) => MoneroNodeConfig::SingleNode {
url: url.to_string(),
},
None => MoneroNodeConfig::Pool,
}
}
}
impl From<Monero> for Option<MoneroNodeConfig> {
fn from(monero: Monero) -> Self {
Some(MoneroNodeConfig::from(monero))
}
}
#[cfg(test)]
pub mod api_test {
use super::*;

View file

@ -13,7 +13,6 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
use strum::Display;
use tokio::sync::{oneshot, Mutex as TokioMutex};
use typeshare::typeshare;
use url::Url;
use uuid::Uuid;
#[typeshare]
@ -703,19 +702,24 @@ pub enum BackgroundRefundState {
Completed,
}
#[typeshare]
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(tag = "type", content = "content")]
pub enum MoneroNodeConfig {
Pool,
SingleNode { url: String },
}
/// This struct contains the settings for the Context
#[typeshare]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct TauriSettings {
/// The URL of the Monero node e.g `http://xmr.node:18081`
#[typeshare(serialized_as = "Option<string>")]
pub monero_node_url: Option<Url>,
/// Configuration for Monero node connection
pub monero_node_config: MoneroNodeConfig,
/// The URLs of the Electrum RPC servers e.g `["ssl://bitcoin.com:50001", "ssl://backup.com:50001"]`
pub electrum_rpc_urls: Vec<String>,
/// Whether to initialize and use a tor client.
pub use_tor: bool,
/// Whether to use the Monero RPC pool instead of custom nodes.
pub use_monero_rpc_pool: bool,
}
#[typeshare]