fix(wallet): Backoff for Electrum balancer (#403)

* fix(wallet): Increase request_timeout to 7s, min_retries to 10 for Electrum load balancer

* fix(wallet): Backoff some time before trying Electrum request
This commit is contained in:
Mohan 2025-06-12 18:14:41 +02:00 committed by GitHub
parent 5f58669915
commit 0fb12fd240
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -1,3 +1,4 @@
use backoff::{Error as BackoffError, ExponentialBackoff};
use bdk_electrum::electrum_client::{Client, ConfigBuilder, ElectrumApi, Error};
use bdk_electrum::BdkElectrumClient;
use bitcoin::Transaction;
@ -5,6 +6,7 @@ use futures::future::join_all;
use once_cell::sync::OnceCell;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::time::Duration;
use std::time::Instant;
use tokio::task::spawn_blocking;
use tracing::{debug, error, instrument, trace, warn};
@ -228,7 +230,8 @@ where
/// If the closure returns an I/O error or certificate error the balancer will try the next
/// node until all nodes have been exhausted. The last encountered error
/// is returned in that case.
/// Now returns `MultiError` containing all individual failures, which can be inspected
///
/// Returns `MultiError` containing all individual failures, which can be inspected
/// by the caller or automatically converted to a single `Error` for compatibility.
#[instrument(level = "debug", skip(self, f), fields(operation = kind, total_clients = self.client_count(), min_retries = self.config.min_retries))]
fn call_sync<F, T>(&self, kind: &str, mut f: F) -> Result<T, MultiError>
@ -237,38 +240,41 @@ where
{
let num_clients = self.client_count();
let mut errors = Vec::new();
let mut attempts = 0;
// Try all electrum clients at least once, or min_retries (whichever is higher)
let total_attempts = std::cmp::max(self.config.min_retries, num_clients);
let allowed_retries = std::cmp::max(self.config.min_retries, num_clients);
for attempt in 0..total_attempts {
attempts += 1;
// Configure exponential backoff
let backoff_policy = ExponentialBackoff {
initial_interval: Duration::from_millis(100),
// 1.5 seconds
max_interval: Duration::from_millis(1500),
// We handle total attempts ourselves
max_elapsed_time: None,
..ExponentialBackoff::default()
};
// Get current index without incrementing (sticky behavior)
let operation_with_backoff = || {
if errors.len() >= allowed_retries {
return Err(BackoffError::permanent(()));
}
// Get current index without incrementing
let idx = self.next.load(Ordering::SeqCst);
// Get client for this index (will initialize if needed)
let client = match self.get_or_init_client_sync(idx) {
Ok(client) => client,
Err(e) => {
trace!(
server_url = self.urls[idx],
attempt = attempt + 1,
error = ?e,
"Client initialization failed, switching to next client"
);
errors.push(e);
// Get client for this index
let client = self.get_or_init_client_sync(idx).map_err(|e| {
trace!(
server_url = self.urls[idx],
attempt = errors.len(),
error = ?e,
"Client initialization failed, switching to next client"
);
// Only advance to next client on failure
self.next
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
Some((current + 1) % num_clients)
})
.expect("fetch_update should never fail");
continue;
}
};
errors.push(e);
BackoffError::transient(())
})?;
// Execute the request synchronously
let start = Instant::now();
@ -276,51 +282,68 @@ where
Ok(res) => {
trace!(
server_url = self.urls[idx],
attempt = attempt + 1,
attempt = errors.len(),
duration_ms = start.elapsed().as_millis(),
"Electrum operation successful (staying with this client)"
);
return Ok(res);
Ok(res)
}
Err(e) => {
Err(err) => {
trace!(
server_url = self.urls[idx],
attempt = attempt + 1,
attempt = errors.len(),
duration_ms = start.elapsed().as_millis(),
error = ?e,
error = ?err,
"Electrum operation failed, switching to next client"
);
errors.push(e);
// Only advance to next client on failure
self.next
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
Some((current + 1) % num_clients)
})
.expect("fetch_update should never fail");
continue;
errors.push(err);
return Err(BackoffError::transient(()));
}
}
};
// Use backoff::retry for the retry logic with exponential backoff
match backoff::retry_notify(
backoff_policy,
operation_with_backoff,
|_: (), duration: Duration| {
trace!(
backoff_duration_ms = duration.as_millis(),
"Backing off before retry"
);
// Advance to next client on failure
self.next
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
Some((current + 1) % num_clients)
})
.expect("fetch_update should never fail");
},
) {
Ok(result) => Ok(result),
Err(_) => {
warn!(
operation = kind,
attempts = errors.len(),
total_attempts = allowed_retries,
total_clients = self.client_count(),
error_count = errors.len(),
all_errors = ?errors,
"All Electrum clients failed after exhausting retry attempts with backoff"
);
let context = format!(
"All {} Electrum clients failed after {} attempts for operation '{}'",
self.client_count(),
errors.len(),
kind
);
Err(MultiError::new(errors, context))
}
}
warn!(
operation = kind,
attempts = attempts,
total_attempts = total_attempts,
total_clients = self.client_count(),
error_count = errors.len(),
all_errors = ?errors,
"All Electrum clients failed after exhausting retry attempts"
);
let context = format!(
"All {} Electrum clients failed after {} attempts for operation '{}'",
self.client_count(),
attempts,
kind
);
Err(MultiError::new(errors, context))
}
/// Execute the given closure on **all** Electrum nodes in parallel.