feat(wallet): Retry logic for Monero wallet (#417)

* add retries to monero-sys, also more logs

* just fmt, satisfy clippy

* change log msg
This commit is contained in:
Raphael 2025-06-17 14:07:37 +02:00 committed by GitHub
parent 2e6d324ab8
commit cf669a87d8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 157 additions and 119 deletions

74
.vscode/settings.json vendored
View file

@ -1,74 +0,0 @@
{
"files.associations": {
"optional": "cpp",
"vector": "cpp",
"__bit_reference": "cpp",
"__hash_table": "cpp",
"__locale": "cpp",
"__node_handle": "cpp",
"__split_buffer": "cpp",
"__threading_support": "cpp",
"__tree": "cpp",
"__verbose_abort": "cpp",
"any": "cpp",
"array": "cpp",
"bitset": "cpp",
"cctype": "cpp",
"cfenv": "cpp",
"charconv": "cpp",
"cinttypes": "cpp",
"clocale": "cpp",
"cmath": "cpp",
"codecvt": "cpp",
"complex": "cpp",
"condition_variable": "cpp",
"csignal": "cpp",
"cstdarg": "cpp",
"cstddef": "cpp",
"cstdint": "cpp",
"cstdio": "cpp",
"cstdlib": "cpp",
"cstring": "cpp",
"ctime": "cpp",
"cwchar": "cpp",
"cwctype": "cpp",
"deque": "cpp",
"execution": "cpp",
"memory": "cpp",
"forward_list": "cpp",
"fstream": "cpp",
"future": "cpp",
"initializer_list": "cpp",
"iomanip": "cpp",
"ios": "cpp",
"iosfwd": "cpp",
"iostream": "cpp",
"istream": "cpp",
"limits": "cpp",
"list": "cpp",
"locale": "cpp",
"map": "cpp",
"mutex": "cpp",
"new": "cpp",
"ostream": "cpp",
"print": "cpp",
"queue": "cpp",
"ratio": "cpp",
"regex": "cpp",
"set": "cpp",
"span": "cpp",
"sstream": "cpp",
"stack": "cpp",
"stdexcept": "cpp",
"streambuf": "cpp",
"string": "cpp",
"string_view": "cpp",
"tuple": "cpp",
"typeinfo": "cpp",
"unordered_map": "cpp",
"unordered_set": "cpp",
"variant": "cpp",
"algorithm": "cpp",
"*.rs": "cpp"
}
}

1
Cargo.lock generated
View file

@ -5820,6 +5820,7 @@ name = "monero-sys"
version = "0.1.0"
dependencies = [
"anyhow",
"backoff",
"cmake",
"cxx",
"cxx-build",

View file

@ -5,6 +5,7 @@ edition = "2021"
[dependencies]
anyhow = "1.0.98"
backoff = "0.4.0"
cxx = "1.0.137"
monero = { version = "0.12", features = ["serde_support"] }
tokio = { version = "1.44.2", features = ["sync", "time", "rt"] }

View file

@ -164,6 +164,11 @@ namespace Monero
{
return std::make_unique<std::vector<std::string>>(tx.txid());
}
inline std::unique_ptr<std::string> walletFilename(const Wallet &wallet)
{
return std::make_unique<std::string>(wallet.filename());
}
}
#include "easylogging++.h"

View file

@ -126,6 +126,9 @@ pub mod ffi {
/// Get the path of the wallet.
fn walletPath(wallet: &Wallet) -> Result<UniquePtr<CxxString>>;
/// Get the filename of the wallet.
fn walletFilename(wallet: &Wallet) -> Result<UniquePtr<CxxString>>;
/// Get the status of the wallet and an error string if there is one.
fn statusWithErrorString(
self: &Wallet,

View file

@ -14,9 +14,11 @@ mod bridge;
use std::{
any::Any, cmp::Ordering, fmt::Display, ops::Deref, path::PathBuf, pin::Pin, str::FromStr,
time::Duration,
};
use anyhow::{bail, Context, Result};
use anyhow::{anyhow, bail, Context, Result};
use backoff::{future, retry_notify};
use cxx::let_cxx_string;
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
@ -376,14 +378,31 @@ impl WalletHandle {
amount: monero::Amount,
) -> anyhow::Result<TxReceipt> {
let address = *address;
self.call(move |wallet| wallet.transfer(&address, amount))
.await
future::retry_notify(backoff(None, None), || async {
self.call(move |wallet| wallet.transfer(&address, amount))
.await
.map_err(backoff::Error::transient)
}, |error, duration: Duration| {
tracing::error!(error=%error, "Failed to transfer funds, retrying in {} secs", duration.as_secs());
})
.await
.map_err(|e| anyhow!("Failed to transfer funds after multiple attempts: {e}"))
}
/// Sweep all funds to an address.
pub async fn sweep(&self, address: &monero::Address) -> anyhow::Result<Vec<String>> {
let address = *address;
self.call(move |wallet| wallet.sweep(&address)).await
future::retry_notify(backoff(None, None), || async {
self.call(move |wallet| wallet.sweep(&address))
.await
.map_err(backoff::Error::transient)
}, |error, duration: Duration| {
tracing::error!(error=%error, "Failed to sweep funds, retrying in {} secs", duration.as_secs());
})
.await
.map_err(|e| anyhow!("Failed to sweep funds after multiple attempts: {e}"))
}
/// Get the seed of the wallet.
@ -472,8 +491,8 @@ impl WalletHandle {
// Initiate the sync (make sure to drop the lock right after)
{
self.call(move |wallet| {
wallet.start_refresh();
wallet.refresh_async();
wallet.start_refresh_thread();
wallet.force_background_refresh();
})
.await;
tracing::debug!("Wallet refresh initiated");
@ -530,7 +549,7 @@ impl WalletHandle {
tokio::time::sleep(std::time::Duration::from_millis(POLL_INTERVAL_MILLIS)).await;
}
tracing::debug!("Wallet synced");
tracing::info!("Wallet synced");
Ok(())
}
@ -563,6 +582,8 @@ impl WalletHandle {
confirmations: u64,
listener: Option<impl Fn(u64) + Send + 'static>,
) -> anyhow::Result<()> {
tracing::info!(%txid, %destination_address, amount=%expected_amount, %confirmations, "Waiting until transaction is confirmed");
const DEFAULT_CHECK_INTERVAL_SECS: u64 = 15;
let mut poll_interval = tokio::time::interval(tokio::time::Duration::from_secs(
@ -610,6 +631,8 @@ impl WalletHandle {
if tx_status.confirmations >= confirmations {
break;
}
tracing::trace!("Transaction not confirmed yet, polling again later");
}
// Signal success
@ -696,6 +719,8 @@ impl WalletManager {
background_sync: bool,
daemon: Daemon,
) -> anyhow::Result<FfiWallet> {
tracing::debug!(%path, "Opening or creating wallet");
// If we haven't loaded the wallet, but it already exists, open it.
if self.wallet_exists(path) {
tracing::debug!(wallet=%path, "Wallet already exists, opening it");
@ -752,6 +777,8 @@ impl WalletManager {
background_sync: bool,
daemon: Daemon,
) -> Result<FfiWallet> {
tracing::debug!(%path, "Creating wallet from keys");
if self.wallet_exists(path) {
tracing::info!(wallet=%path, "Wallet already exists, opening it");
@ -825,6 +852,8 @@ impl WalletManager {
background_sync: bool,
daemon: Daemon,
) -> anyhow::Result<FfiWallet> {
tracing::debug!(%path, "Recovering wallet from seed");
let_cxx_string!(path = path);
let_cxx_string!(password = password.unwrap_or(""));
let_cxx_string!(mnemonic = mnemonic);
@ -854,6 +883,8 @@ impl WalletManager {
/// Close a wallet, storing the wallet state.
fn close_wallet(&mut self, wallet: &mut FfiWallet) -> anyhow::Result<()> {
tracing::info!(wallet=%wallet.filename(), "Closing wallet");
// Safety: we know we have a valid, unique pointer to the wallet
let success = unsafe { self.inner.pinned().closeWallet(wallet.inner.inner, true) }
.context("Failed to close wallet: Ffi call failed with exception")?;
@ -876,6 +907,8 @@ impl WalletManager {
background_sync: bool,
daemon: Daemon,
) -> anyhow::Result<FfiWallet> {
tracing::debug!(%path, "Opening wallet");
let_cxx_string!(path = path);
let_cxx_string!(password = password.unwrap_or(""));
let network_type = network_type.into();
@ -919,6 +952,8 @@ impl WalletManager {
/// Check if a wallet exists at the given path.
pub fn wallet_exists(&mut self, path: &str) -> bool {
tracing::debug!(%path, "Checking if wallet exists");
let_cxx_string!(path = path);
self.inner
.pinned()
@ -964,10 +999,17 @@ impl FfiWallet {
tracing::debug!(address=%wallet.main_address(), "Initializing wallet");
wallet
.init(&daemon.address, daemon.ssl)
.context("Failed to initialize wallet")?;
retry_notify(
backoff(None, None),
|| {
wallet
.init(&daemon.address, daemon.ssl)
.context("Failed to initialize wallet")
.map_err(backoff::Error::transient)
},
|e, duration: Duration| tracing::error!(error=%e, "Failed to initialize wallet, retrying in {} secs", duration.as_secs()),
)
.map_err(|e| anyhow!("Failed to initialize wallet: {e}"))?;
tracing::debug!("Initialized wallet, setting daemon address");
wallet.set_daemon_address(&daemon.address)?;
@ -975,8 +1017,8 @@ impl FfiWallet {
if background_sync {
tracing::debug!("Background sync enabled, starting refresh thread");
wallet.start_refresh();
wallet.refresh_async();
wallet.start_refresh_thread();
wallet.force_background_refresh();
}
// Check for errors on general principles
@ -993,6 +1035,14 @@ impl FfiWallet {
.to_string()
}
/// Get the filename of the wallet.
pub fn filename(&self) -> String {
ffi::walletFilename(&self.inner)
.context("Failed to get wallet filename: FFI call failed with exception")
.expect("Wallet filename should never fail")
.to_string()
}
/// Get the address for the given account and address index.
/// address(0, 0) is the main address.
/// We don't use anything besides the main address so this is a private method (for now).
@ -1005,6 +1055,8 @@ impl FfiWallet {
}
fn set_daemon_address(&mut self, address: &str) -> anyhow::Result<()> {
tracing::debug!(%address, "Setting daemon address");
let_cxx_string!(address = address);
let raw_wallet = &mut self.inner;
@ -1027,6 +1079,8 @@ impl FfiWallet {
/// Initialize the wallet and download initial values from the remote node.
/// Does not actuallyt sync the wallet, use any of the refresh methods to do that.
fn init(&mut self, daemon_address: &str, ssl: bool) -> anyhow::Result<()> {
tracing::debug!(%daemon_address, %ssl, "Initializing wallet");
let_cxx_string!(daemon_address = daemon_address);
let_cxx_string!(daemon_username = "");
let_cxx_string!(daemon_password = "");
@ -1070,7 +1124,11 @@ impl FfiWallet {
return SyncProgress::zero();
}
SyncProgress::new(current_block, target_block)
let progress = SyncProgress::new(current_block, target_block);
tracing::trace!(%progress, "Sync progress");
progress
}
fn connected(&self) -> bool {
@ -1080,15 +1138,24 @@ impl FfiWallet {
.context("Failed to get connection status: FFI call failed with exception")
.expect("Shouldn't panic")
{
ffi::ConnectionStatus::Connected => true,
ffi::ConnectionStatus::Connected => {
tracing::trace!("Daemon is connected");
true
}
ffi::ConnectionStatus::WrongVersion => {
tracing::warn!("Version mismatch with daemon");
tracing::error!("Version mismatch with daemon, interpreting as disconnected");
false
}
ffi::ConnectionStatus::Disconnected => {
tracing::trace!("Daemon is disconnected");
false
}
ffi::ConnectionStatus::Disconnected => false,
// Fallback since C++ allows any other value.
status => {
tracing::error!("Unknown connection status: `{}`", status.repr);
tracing::error!(
"Unknown connection status, interpreting as disconnected: `{}`",
status.repr
);
false
}
}
@ -1118,7 +1185,7 @@ impl FfiWallet {
}
/// Start the background refresh thread (refreshes every 10 seconds).
fn start_refresh(&mut self) {
fn start_refresh_thread(&mut self) {
self.inner
.pinned()
.startRefresh()
@ -1129,7 +1196,7 @@ impl FfiWallet {
/// Refresh the wallet asynchronously.
/// Same as start_refresh except that the background thread only
/// refreshes once. Maybe?
fn refresh_async(&mut self) {
fn force_background_refresh(&mut self) {
self.inner
.pinned()
.refreshAsync()
@ -1600,3 +1667,21 @@ impl Deref for PendingTransaction {
}
}
}
/// Create a backoff strategy for retrying a function.
/// Default max elapsed time is 5 minutes, default max interval is 30 seconds.
fn backoff(
max_elapsed_time_secs: impl Into<Option<u64>>,
max_interval_secs: impl Into<Option<u64>>,
) -> backoff::ExponentialBackoff {
let max_elapsed_time_secs: Option<u64> = max_elapsed_time_secs.into();
let max_elapsed_time = Duration::from_secs(max_elapsed_time_secs.unwrap_or(5 * 60));
let max_interval_secs: Option<u64> = max_interval_secs.into();
let max_interval = Duration::from_secs(max_interval_secs.unwrap_or(30));
backoff::ExponentialBackoffBuilder::new()
.with_max_elapsed_time(Some(max_elapsed_time))
.with_max_interval(max_interval)
.build()
}

View file

@ -468,6 +468,7 @@ async fn init_monero_wallet(
daemon,
env_config.monero_network,
false,
None,
)
.await
.context("Failed to initialize Monero wallets")?;

View file

@ -535,7 +535,7 @@ async fn init_monero_wallet(
data_dir: &Path,
monero_daemon_address: impl Into<Option<String>>,
env_config: EnvConfig,
_tauri_handle: Option<TauriHandle>,
tauri_handle: Option<TauriHandle>,
) -> Result<Arc<Wallets>> {
let network = env_config.monero_network;
@ -586,6 +586,7 @@ async fn init_monero_wallet(
daemon,
network,
false,
tauri_handle,
)
.await
.context("Failed to initialize Monero wallets")?;

View file

@ -12,19 +12,28 @@ use monero::{Address, Network};
pub use monero_sys::{Daemon, WalletHandle as Wallet};
use uuid::Uuid;
use crate::cli::api::tauri_bindings::TauriHandle;
use super::{BlockHeight, TransferProof, TxHash};
/// Entrance point to the Monero blockchain.
/// You can use this struct to open specific wallets and monitor the blockchain.
pub struct Wallets {
/// The directory we store the wallets in.
wallet_dir: PathBuf,
/// The network we're on.
network: Network,
/// The monero node we connect to.
daemon: Daemon,
/// Keep the main wallet open and synced.
main_wallet: Arc<Wallet>,
/// Whether we're running in regtest mode.
/// Since Network::Regtest isn't a thing we have to use an extra flag.
/// When we're in regtest mode, we need to unplug some safty nets to make the Wallet work.
/// When we're in regtest mode, we need to unplug some safty nets to make the wallet work.
regtest: bool,
/// A handle we use to send status updates to the UI i.e. when
/// waiting for a transaction to be confirmed.
#[expect(dead_code)]
tauri_handle: Option<TauriHandle>,
}
/// A request to watch for a transfer.
@ -59,6 +68,7 @@ impl Wallets {
daemon: Daemon,
network: Network,
regtest: bool,
tauri_handle: Option<TauriHandle>,
) -> Result<Self> {
let main_wallet = Wallet::open_or_create(
wallet_dir.join(&main_wallet_name).display().to_string(),
@ -81,6 +91,7 @@ impl Wallets {
daemon,
main_wallet,
regtest,
tauri_handle,
};
Ok(wallets)

View file

@ -3,7 +3,7 @@ use crate::bitcoin::{
TxEarlyRefund, TxPunish, TxRedeem, TxRefund, Txid,
};
use crate::env::Config;
use crate::monero::wallet::{no_listener, TransferRequest, WatchRequest};
use crate::monero::wallet::{TransferRequest, WatchRequest};
use crate::monero::BlockHeight;
use crate::monero::TransferProof;
use crate::monero_ext::ScalarExt;
@ -571,15 +571,14 @@ impl State3 {
// Ensure that the XMR to be refunded are spendable by awaiting 10 confirmations
// on the lock transaction.
// We pass Mutex<Wallet> instead of a &mut Wallet to
// enable releasing the lock and avoid starving other tasks while waiting
// for the confirmations.
tracing::info!("Waiting for Monero lock transaction to be confirmed");
let transfer_proof_2 = transfer_proof.clone();
monero_wallet
.wait_until_confirmed(
self.lock_xmr_watch_request(transfer_proof_2, 10),
no_listener(),
Some(move |confirmations| {
tracing::debug!(%confirmations, "Monero lock transaction confirmed");
}),
)
.await
.context("Failed to wait for Monero lock transaction to be confirmed")?;

View file

@ -8,7 +8,6 @@ use crate::asb::{EventLoopHandle, LatestRate};
use crate::bitcoin::ExpiredTimelocks;
use crate::common::retry;
use crate::env::Config;
use crate::monero::wallet::no_listener;
use crate::monero::TransferProof;
use crate::protocol::alice::{AliceState, Swap};
use crate::{bitcoin, monero};
@ -301,15 +300,18 @@ where
state3,
} => match state3.expired_timelocks(bitcoin_wallet).await? {
ExpiredTimelocks::None { .. } => {
tracing::info!("Locked Monero, waiting for confirmations");
monero_wallet
.wait_until_confirmed(
state3.lock_xmr_watch_request(transfer_proof.clone(), 1),
no_listener(), // TODO: Add a listener with status updates
Some(|confirmations| {
tracing::debug!(%confirmations, "Monero lock tx got new confirmation")
}),
)
.await
.with_context(|| {
format!(
"Failed to watch for transfer of XMR in transaction {}",
"Failed to wait until Monero transaction was confirmed ({})",
transfer_proof.tx_hash()
)
})?;

View file

@ -1,7 +1,6 @@
mod bitcoind;
mod electrs;
use anyhow::{bail, Context, Result};
use async_trait::async_trait;
use bitcoin_harness::{BitcoindRpcApi, Client};
@ -314,6 +313,7 @@ async fn init_test_wallets(
monero_daemon,
monero::Network::Mainnet,
true,
None,
)
.await
.unwrap();
@ -877,26 +877,33 @@ impl TestContext {
pub async fn stop_alice_monero_wallet_rpc(&self) {
tracing::info!("Killing monerod container");
// Use Docker CLI to forcefully kill the container
let output = tokio::process::Command::new("docker")
.args(&["kill", &self.monerod_container_id])
.args(["kill", &self.monerod_container_id])
.output()
.await
.expect("Failed to execute docker kill command");
if output.status.success() {
tracing::info!("Successfully killed monerod container: {}", &self.monerod_container_id);
tracing::info!(
"Successfully killed monerod container: {}",
&self.monerod_container_id
);
} else {
let stderr = String::from_utf8_lossy(&output.stderr);
tracing::error!("Failed to kill monerod container {}: {}", &self.monerod_container_id, stderr);
tracing::error!(
"Failed to kill monerod container {}: {}",
&self.monerod_container_id,
stderr
);
}
}
pub async fn empty_alice_monero_wallet(&self) {
let burn_address = monero::Address::from_str("49LEH26DJGuCyr8xzRAzWPUryzp7bpccC7Hie1DiwyfJEyUKvMFAethRLybDYrFdU1eHaMkKQpUPebY4WT3cSjEvThmpjPa").unwrap();
let wallet = self.alice_monero_wallet.main_wallet().await;
wallet
.sweep(&burn_address)
.await
@ -905,13 +912,9 @@ impl TestContext {
pub async fn assert_alice_monero_wallet_empty(&self) {
let wallet = self.alice_monero_wallet.main_wallet().await;
assert_eventual_balance(
&*wallet,
Ordering::Equal,
monero::Amount::ZERO,
)
.await
.unwrap();
assert_eventual_balance(&*wallet, Ordering::Equal, monero::Amount::ZERO)
.await
.unwrap();
}
}