fix(swap): Monero wallet thread safety (#281)

* add comment to ConfirmationListener

* swap: always wrap monero::Wallet in tokio::sync::Mutex

Before, monero::Wallet wrapped a Mutex<Client>, and locked
the mutex on each operation. This meant releasing the
lock in between operations, even though we rely on the
operations being executed in order.

To remedy this race condition, we wrap monero::Wallet itself
in a mutex, requiring any caller to hold the lock for the duration
of the operation, including any suboperations.

* work on: releasing the lock while waiting for confirmations

Due to the newly introduced thread safety, we are currently holding
lock to the monero wallet while waiting for confirmations
-- since this takes a lot of time, it starves all other tasks
that do anything with the monero wallet.

In this commit I start implementing a change that enables us to release
the lock to the wallet while waiting for confirmations and only acquire it
when necessary.

This breaks with the current system of passing just a generic client
which implements the MoneroWalletRpc trait (which we use to pass a dummy
client for testing).

This commit is the first step towards a small refactor to that system.

* always pass Wallet instead of a MoneroWalletRpc client

By always passing Arc<Mutex<Wallet>> instead of MoneroWalletRpc clients
directly we can allow the wait_for_confirmations functions to lock the
Mutex and access the client when they need to, while releasing the lock
when waiting for the next tick. This stops the current starving of other
tasks waiting for the lock.

Since we use a dummy client for testing, this required adding a generic
parameter to the Wallet. However, since we specify a default type,
this doesn't actually require generic handling anywhere.

* add warning comment to monero::wallet::Wallet::from_dummy

* add timeout when waiting for monero lock during quote

This commit adds a timeout after 60 seconds when trying to acquire
the lock on the monero wallet while making a quote.
Should a timout occur, we return an error.
This makes sure that we get _some_ return value and that
starvation is noticed.

* fix lints, don't keep lock during loop body in wait_for_confirmations

* always immediately drop lock in wait_for_transfer

* fix clippy lints

* open wallet instead of failing when we can't create from keys

When we fail to create a monero wallet from keys, we will now try
to open it instead. I also renamed the method to be more consistent
with Wallet::open_or_create.

These changes are mostly taken from #260.

* improve documentation on monero::Wallet

* use Wallet::open instead of Wallet::Client::open

* use create_from_keys_and_sweep in bob's redeem_xmr

This commit deduplicates logic by using
create_from_keys_and_sweep_to in bob's redeem_xmr
and also adds the create_from_keys_and_sweep_to
method while making create_from_keys_and_sweep a
wrapper around it.

* add error context and improve logging

* fix deadlock in wait_for_confirmation_with, add timout to test
This commit is contained in:
Raphael 2025-04-24 15:34:01 +02:00 committed by GitHub
parent e8084d65ec
commit f1e5cdfbfe
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 384 additions and 262 deletions

View file

@ -23,7 +23,8 @@ use std::convert::{Infallible, TryInto};
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{mpsc, oneshot};
use tokio::sync::{mpsc, oneshot, Mutex};
use tokio::time::timeout;
use uuid::Uuid;
/// The time-to-live for quotes in the cache
@ -44,7 +45,7 @@ where
swarm: libp2p::Swarm<Behaviour<LR>>,
env_config: env::Config,
bitcoin_wallet: Arc<bitcoin::Wallet>,
monero_wallet: Arc<monero::Wallet>,
monero_wallet: Arc<Mutex<monero::Wallet>>,
db: Arc<dyn Database + Send + Sync>,
latest_rate: LR,
min_buy: bitcoin::Amount,
@ -131,7 +132,7 @@ where
swarm: Swarm<Behaviour<LR>>,
env_config: env::Config,
bitcoin_wallet: Arc<bitcoin::Wallet>,
monero_wallet: Arc<monero::Wallet>,
monero_wallet: Arc<Mutex<monero::Wallet>>,
db: Arc<dyn Database + Send + Sync>,
latest_rate: LR,
min_buy: bitcoin::Amount,
@ -231,7 +232,7 @@ where
}
};
let wallet_snapshot = match WalletSnapshot::capture(&self.bitcoin_wallet, &self.monero_wallet, &self.external_redeem_address, btc).await {
let wallet_snapshot = match WalletSnapshot::capture(&self.bitcoin_wallet, &*self.monero_wallet.lock().await, &self.external_redeem_address, btc).await {
Ok(wallet_snapshot) => wallet_snapshot,
Err(error) => {
tracing::error!("Swap request will be ignored because we were unable to create wallet snapshot for swap: {:#}", error);
@ -516,6 +517,11 @@ where
min_buy: bitcoin::Amount,
max_buy: bitcoin::Amount,
) -> Result<Arc<BidQuote>, Arc<anyhow::Error>> {
/// This is how long we maximally wait for the wallet lock
/// -- else the quote will be out of date and we will return
/// an error.
const MAX_WAIT_DURATION: Duration = Duration::from_secs(60);
let ask_price = self
.latest_rate
.latest_rate()
@ -523,8 +529,9 @@ where
.ask()
.map_err(|e| Arc::new(e.context("Failed to compute asking price")))?;
let balance = self
.monero_wallet
let balance = timeout(MAX_WAIT_DURATION, self.monero_wallet.lock())
.await
.context("Timeout while waiting for lock on monero wallet while making quote")?
.get_balance()
.await
.map_err(|e| Arc::new(e.context("Failed to get Monero balance")))?;

View file

@ -6,6 +6,7 @@ use anyhow::{bail, Result};
use libp2p::PeerId;
use std::convert::TryInto;
use std::sync::Arc;
use tokio::sync::Mutex;
use uuid::Uuid;
#[derive(Debug, thiserror::Error)]
@ -26,7 +27,7 @@ pub enum Error {
pub async fn refund(
swap_id: Uuid,
bitcoin_wallet: Arc<bitcoin::Wallet>,
monero_wallet: Arc<monero::Wallet>,
monero_wallet: Arc<Mutex<monero::Wallet>>,
db: Arc<dyn Database>,
) -> Result<AliceState> {
let state = db.get_state(swap_id).await?.try_into()?;
@ -73,7 +74,7 @@ pub async fn refund(
state3
.refund_xmr(
&monero_wallet,
monero_wallet.clone(),
monero_wallet_restore_blockheight,
swap_id.to_string(),
spend_key,

View file

@ -122,11 +122,11 @@ pub async fn main() -> Result<()> {
// Initialize Monero wallet
let monero_wallet = init_monero_wallet(&config, env_config).await?;
let monero_address = monero_wallet.get_main_address();
let monero_address = monero_wallet.lock().await.get_main_address();
tracing::info!(%monero_address, "Monero wallet address");
// Check Monero balance
let monero = monero_wallet.get_balance().await?;
let monero = monero_wallet.lock().await.get_balance().await?;
match (monero.balance, monero.unlocked_balance) {
(0, _) => {
tracing::warn!(
@ -317,7 +317,7 @@ pub async fn main() -> Result<()> {
}
Command::Balance => {
let monero_wallet = init_monero_wallet(&config, env_config).await?;
let monero_balance = monero_wallet.get_balance().await?;
let monero_balance = monero_wallet.lock().await.get_balance().await?;
tracing::info!(%monero_balance);
let bitcoin_wallet = init_bitcoin_wallet(&config, &seed, env_config).await?;
@ -419,7 +419,7 @@ async fn init_bitcoin_wallet(
async fn init_monero_wallet(
config: &Config,
env_config: swap::env::Config,
) -> Result<monero::Wallet> {
) -> Result<tokio::sync::Mutex<monero::Wallet>> {
tracing::debug!("Opening Monero wallet");
let wallet = monero::Wallet::open_or_create(
config.monero.wallet_rpc_url.clone(),
@ -428,7 +428,7 @@ async fn init_monero_wallet(
)
.await?;
Ok(wallet)
Ok(tokio::sync::Mutex::new(wallet))
}
/// This struct is used to extract swap details from the database and print them in a table format

View file

@ -188,7 +188,7 @@ pub struct Context {
pub tasks: Arc<PendingTaskList>,
tauri_handle: Option<TauriHandle>,
bitcoin_wallet: Option<Arc<bitcoin::Wallet>>,
monero_wallet: Option<Arc<monero::Wallet>>,
monero_wallet: Option<Arc<TokioMutex<monero::Wallet>>>,
monero_rpc_process: Option<Arc<SyncMutex<monero::WalletRpcProcess>>>,
tor_client: Option<Arc<TorClient<TokioRustlsRuntime>>>,
}
@ -387,7 +387,10 @@ impl ContextBuilder {
]),
);
Ok((Some(Arc::new(wlt)), Some(Arc::new(SyncMutex::new(prc)))))
Ok((
Some(Arc::new(TokioMutex::new(wlt))),
Some(Arc::new(SyncMutex::new(prc))),
))
}
None => Ok((None, None)),
}
@ -482,7 +485,7 @@ impl Context {
env_config: EnvConfig,
db_path: PathBuf,
bob_bitcoin_wallet: Arc<bitcoin::Wallet>,
bob_monero_wallet: Arc<monero::Wallet>,
bob_monero_wallet: Arc<TokioMutex<monero::Wallet>>,
) -> Self {
let config = Config::for_harness(seed, env_config);

View file

@ -133,7 +133,7 @@ impl TauriHandle {
let request_id = Uuid::new_v4();
let now_secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("it is later than the begin of the unix epoch")
.expect("system time to be after unix epoch (1970-01-01)")
.as_secs();
let expiration_ts = now_secs + timeout_secs;

View file

@ -10,16 +10,26 @@ use std::future::Future;
use std::ops::Div;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::time::Interval;
use url::Url;
/// This is our connection to the monero blockchain which we use
/// all over the codebase, mostly as `Arc<Mutex<Wallet>>`.
///
/// It represents a connection to a monero-wallet-rpc daemon,
/// which can load a (single) wallet at a time.
/// This struct contains methods for opening, closing, creating
/// wallet and for sending funds from the loaded wallet.
#[derive(Debug)]
pub struct Wallet {
inner: Mutex<wallet::Client>,
pub struct Wallet<C = wallet::Client> {
inner: C,
network: Network,
name: String,
/// The file name of the main wallet (the first wallet loaded)
main_wallet: String,
/// The first address of the main wallet
main_address: monero::Address,
sync_interval: Duration,
}
@ -31,7 +41,8 @@ impl Wallet {
match client.open_wallet(name.clone()).await {
Err(error) => {
tracing::debug!(%error, "Open wallet response error");
tracing::debug!(%error, "Failed to open wallet, trying to create instead");
client.create_wallet(name.clone(), "English".to_owned()).await.context(
"Unable to create Monero wallet, please ensure that the monero-wallet-rpc is available",
)?;
@ -50,32 +61,61 @@ impl Wallet {
monero::Address::from_str(client.get_address(0).await?.address.as_str())?;
Ok(Self {
inner: Mutex::new(client),
inner: client,
network: env_config.monero_network,
name,
main_wallet: name,
main_address,
sync_interval: env_config.monero_sync_interval(),
})
}
/// Re-open the wallet using the internally stored name.
/// This can be used to create dummy wallet for testing purposes.
/// Warning: filled with non-sense values, don't use for anything
/// but as a wrapper around your dummy client.
#[cfg(test)]
fn from_dummy<T: monero_rpc::wallet::MoneroWalletRpc<reqwest::Client> + Sync>(
client: T,
network: Network,
) -> Wallet<T> {
// Here we make up some values just so we can use the wallet in tests
// Todo: verify this works
use curve25519_dalek::scalar::Scalar;
let privkey = PrivateKey::from_scalar(Scalar::one());
let pubkey = PublicKey::from_private_key(&privkey);
Wallet {
inner: client,
network,
sync_interval: Duration::from_secs(100),
main_wallet: "foo".into(),
main_address: Address::standard(network, pubkey, pubkey),
}
}
/// Re-open the internally stored wallet from it's file.
pub async fn re_open(&self) -> Result<()> {
self.inner
.lock()
self.open(self.main_wallet.clone())
.await
.open_wallet(self.name.clone())
.await?;
.context("Failed to re-open main wallet")?;
Ok(())
}
/// Open a monero wallet from a file.
pub async fn open(&self, filename: String) -> Result<()> {
self.inner.lock().await.open_wallet(filename).await?;
self.inner
.open_wallet(filename)
.await
.context("Failed to open ")?;
Ok(())
}
/// Close the wallet and open (load) another wallet by generating it from
/// keys. The generated wallet will remain loaded.
pub async fn create_from_and_load(
///
/// If the wallet already exists, it will just be loaded instead.
pub async fn open_or_create_from_keys(
&self,
file_name: String,
private_spend_key: PrivateKey,
@ -87,18 +127,18 @@ impl Wallet {
let address = Address::standard(self.network, public_spend_key, public_view_key);
let wallet = self.inner.lock().await;
// Properly close the wallet before generating the other wallet to ensure that
// it saves its state correctly
let _ = wallet
let _ = self
.inner
.close_wallet()
.await
.context("Failed to close wallet")?;
let _ = wallet
let result = self
.inner
.generate_from_keys(
file_name,
file_name.clone(),
address.to_string(),
private_spend_key.to_string(),
PrivateKey::from(private_view_key).to_string(),
@ -106,25 +146,65 @@ impl Wallet {
String::from(""),
true,
)
.await
.context("Failed to generate new wallet from keys")?;
.await;
Ok(())
// If we failed to create the wallet because it already exists,
// we just try to open it instead
match result {
Ok(_) => Ok(()),
Err(error) if error.to_string().contains("Wallet already exists") => {
tracing::debug!(
monero_wallet_name = &file_name,
"Cannot create wallet because it already exists, loading instead"
);
self.open(file_name)
.await
.context("Failed to create wallet from keys ('Wallet already exists'), subsequent attempt to open failed, too")
}
Err(error) => Err(error).context("Failed to create wallet from keys"),
}
}
/// Close the wallet and open (load) another wallet by generating it from
/// keys. The generated wallet will be opened, all funds sweeped to the
/// main_address and then the wallet will be re-loaded using the internally
/// stored name.
/// A wrapper around [`create_from_keys_and_sweep_to`] that sweeps all funds to the
/// main address of the wallet.
/// For the ASB this is the main wallet.
/// For the CLI, I don't know which wallet it is.
///
/// Returns the tx hashes of the sweep.
pub async fn create_from_keys_and_sweep(
&self,
file_name: String,
private_spend_key: PrivateKey,
private_view_key: PrivateViewKey,
restore_height: BlockHeight,
) -> Result<()> {
) -> Result<Vec<TxHash>> {
self.create_from_keys_and_sweep_to(
file_name,
private_spend_key,
private_view_key,
restore_height,
self.main_address,
)
.await
}
/// Close the wallet and open (load) another wallet by generating it from
/// keys. The generated wallet will be opened, all funds sweeped to the
/// specified destination address and then the original wallet will be re-loaded using the internally
/// stored name.
///
/// Returns the tx hashes of the sweep.
pub async fn create_from_keys_and_sweep_to(
&self,
file_name: String,
private_spend_key: PrivateKey,
private_view_key: PrivateViewKey,
restore_height: BlockHeight,
destination_address: Address,
) -> Result<Vec<TxHash>> {
// Close the default wallet, generate the new wallet from the keys and load it
self.create_from_and_load(
self.open_or_create_from_keys(
file_name,
private_spend_key,
private_view_key,
@ -133,48 +213,30 @@ impl Wallet {
.await?;
// Refresh the generated wallet
if let Err(error) = self.refresh(20).await {
return Err(anyhow::anyhow!(error)
.context("Failed to refresh generated wallet for sweeping to default wallet"));
}
self.refresh(20)
.await
.context("Failed to refresh generated wallet for sweeping to destination address")?;
// Sweep all the funds from the generated wallet to the default wallet
// Sweep all the funds from the generated wallet to the specified destination address
let sweep_result = self
.inner
.lock()
.sweep_all(destination_address)
.await
.sweep_all(self.main_address.to_string())
.await;
.context("Failed to transfer Monero to destination address")?;
match sweep_result {
Ok(sweep_all) => {
for tx in sweep_all.tx_hash_list {
tracing::info!(
%tx,
monero_address = %self.main_address,
"Monero transferred back to default wallet");
}
}
Err(error) => {
return Err(
anyhow::anyhow!(error).context("Failed to transfer Monero to default wallet")
);
}
for tx in &sweep_result {
tracing::info!(
%tx,
monero_address = %destination_address,
"Monero transferred to destination address");
}
let _ = self
.inner
.lock()
.await
.open_wallet(self.name.clone())
.await?;
self.re_open().await?;
Ok(())
Ok(sweep_result)
}
/// Transfer a specified amount of monero to a specified address.
pub async fn transfer(&self, request: TransferRequest) -> Result<TransferProof> {
let inner = self.inner.lock().await;
let TransferRequest {
public_spend_key,
public_view_key,
@ -184,7 +246,8 @@ impl Wallet {
let destination_address =
Address::standard(self.network, public_spend_key, public_view_key.into());
let res = inner
let res = self
.inner
.transfer_single(0, amount.as_piconero(), &destination_address.to_string())
.await?;
@ -202,60 +265,9 @@ impl Wallet {
))
}
/// Wait until the specified transfer has been completed or failed.
pub async fn watch_for_transfer(&self, request: WatchRequest) -> Result<(), InsufficientFunds> {
self.watch_for_transfer_with(request, None).await
}
/// Wait until the specified transfer has been completed or failed and listen to each new confirmation.
#[allow(clippy::too_many_arguments)]
pub async fn watch_for_transfer_with(
&self,
request: WatchRequest,
listener: Option<ConfirmationListener>,
) -> Result<(), InsufficientFunds> {
let WatchRequest {
conf_target,
public_view_key,
public_spend_key,
transfer_proof,
expected,
} = request;
let txid = transfer_proof.tx_hash();
tracing::info!(
%txid,
target_confirmations = %conf_target,
"Waiting for Monero transaction finality"
);
let address = Address::standard(self.network, public_spend_key, public_view_key.into());
let check_interval = tokio::time::interval(self.sync_interval.div(10));
wait_for_confirmations_with(
&self.inner,
transfer_proof,
address,
expected,
conf_target,
check_interval,
self.name.clone(),
listener,
)
.await?;
Ok(())
}
/// Send all funds from the currently loaded wallet to a specified address.
pub async fn sweep_all(&self, address: Address) -> Result<Vec<TxHash>> {
let sweep_all = self
.inner
.lock()
.await
.sweep_all(address.to_string())
.await?;
let sweep_all = self.inner.sweep_all(address.to_string()).await?;
let tx_hashes = sweep_all.tx_hash_list.into_iter().map(TxHash).collect();
Ok(tx_hashes)
@ -263,11 +275,11 @@ impl Wallet {
/// Get the balance of the primary account.
pub async fn get_balance(&self) -> Result<wallet::GetBalance> {
Ok(self.inner.lock().await.get_balance(0).await?)
Ok(self.inner.get_balance(0).await?)
}
pub async fn block_height(&self) -> Result<BlockHeight> {
Ok(self.inner.lock().await.get_height().await?)
Ok(self.inner.get_height().await?)
}
pub fn get_main_address(&self) -> Address {
@ -278,13 +290,13 @@ impl Wallet {
const RETRY_INTERVAL: Duration = Duration::from_secs(1);
for i in 1..=max_attempts {
tracing::info!(name = %self.name, attempt=i, "Syncing Monero wallet");
tracing::info!(name = %self.main_wallet, attempt=i, "Syncing Monero wallet");
let result = self.inner.lock().await.refresh().await;
let result = self.inner.refresh().await;
match result {
Ok(refreshed) => {
tracing::info!(name = %self.name, "Monero wallet synced");
tracing::info!(name = %self.main_wallet, "Monero wallet synced");
return Ok(refreshed);
}
Err(error) => {
@ -293,15 +305,15 @@ impl Wallet {
// We would not want to fail here if the height is not available
// as it is not critical for the operation of the wallet.
// We can just log a warning and continue.
let height = match self.inner.lock().await.get_height().await {
let height = match self.inner.get_height().await {
Ok(height) => height.to_string(),
Err(_) => {
tracing::warn!(name = %self.name, "Failed to fetch Monero wallet height during sync");
tracing::warn!(name = %self.main_wallet, "Failed to fetch Monero wallet height during sync");
"unknown".to_string()
}
};
tracing::warn!(attempt=i, %height, %attempts_left, name = %self.name, %error, "Failed to sync Monero wallet");
tracing::warn!(attempt=i, %height, %attempts_left, name = %self.main_wallet, %error, "Failed to sync Monero wallet");
if attempts_left == 0 {
return Err(error.into());
@ -315,6 +327,61 @@ impl Wallet {
}
}
/// Wait until the specified transfer has been completed or failed.
pub async fn watch_for_transfer(
wallet: Arc<Mutex<Wallet>>,
request: WatchRequest,
) -> Result<(), InsufficientFunds> {
watch_for_transfer_with(wallet, request, None).await
}
/// Wait until the specified transfer has been completed or failed and listen to each new confirmation.
#[allow(clippy::too_many_arguments)]
pub async fn watch_for_transfer_with(
wallet: Arc<Mutex<Wallet>>,
request: WatchRequest,
listener: Option<ConfirmationListener>,
) -> Result<(), InsufficientFunds> {
let WatchRequest {
conf_target,
public_view_key,
public_spend_key,
transfer_proof,
expected,
} = request;
let txid = transfer_proof.tx_hash();
tracing::info!(
%txid,
target_confirmations = %conf_target,
"Waiting for Monero transaction finality"
);
let address = Address::standard(
wallet.lock().await.network,
public_spend_key,
public_view_key.into(),
);
let check_interval = tokio::time::interval(wallet.lock().await.sync_interval.div(10));
let wallet_name = wallet.lock().await.main_wallet.clone();
wait_for_confirmations_with(
wallet.clone(),
transfer_proof,
address,
expected,
conf_target,
check_interval,
wallet_name,
listener,
)
.await?;
Ok(())
}
#[derive(Debug)]
pub struct TransferRequest {
pub public_spend_key: PublicKey,
@ -331,6 +398,13 @@ pub struct WatchRequest {
pub expected: Amount,
}
/// This is a shorthand for the dynamic type we use to pass listeners to
/// i.e. the `wait_for_confirmations` function. It is basically
/// an `async fn` which takes a `u64` and returns nothing, but in dynamic.
///
/// We use this to pass a listener that sends events to the tauri
/// frontend to show upates to the number of confirmations that
/// a tx has.
type ConfirmationListener =
Box<dyn Fn(u64) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> + Send + 'static>;
@ -338,7 +412,7 @@ type ConfirmationListener =
async fn wait_for_confirmations_with<
C: monero_rpc::wallet::MoneroWalletRpc<reqwest::Client> + Sync,
>(
client: &Mutex<C>,
wallet: Arc<Mutex<Wallet<C>>>,
transfer_proof: TransferProof,
to_address: Address,
expected: Amount,
@ -353,16 +427,21 @@ async fn wait_for_confirmations_with<
check_interval.tick().await; // tick() at the beginning of the loop so every `continue` tick()s as well
let txid = transfer_proof.tx_hash().to_string();
let client = client.lock().await;
let tx = match client
// Make sure to drop the lock before matching on the result
// otherwise it will deadlock on the error code -13 case
let result = wallet
.lock()
.await
.inner
.check_tx_key(
txid.clone(),
transfer_proof.tx_key.to_string(),
to_address.to_string(),
)
.await
{
.await;
let tx = match result {
Ok(proof) => proof,
Err(jsonrpc::Error::JsonRpc(jsonrpc::JsonRpcError {
code: -1,
@ -381,7 +460,13 @@ async fn wait_for_confirmations_with<
txid
);
if let Err(err) = client.open_wallet(wallet_name.clone()).await {
if let Err(err) = wallet
.lock()
.await
.inner
.open_wallet(wallet_name.clone())
.await
{
tracing::warn!(
%err,
"Failed to open wallet `{}` to continue monitoring of Monero transaction {}",
@ -437,12 +522,13 @@ mod tests {
use crate::tracing_ext::capture_logs;
use monero_rpc::wallet::CheckTxKey;
use std::sync::atomic::{AtomicU32, Ordering};
use tokio::sync::Mutex;
use tracing::metadata::LevelFilter;
async fn wait_for_confirmations<
C: monero_rpc::wallet::MoneroWalletRpc<reqwest::Client> + Sync,
>(
client: &Mutex<C>,
client: Arc<Mutex<Wallet<C>>>,
transfer_proof: TransferProof,
to_address: Address,
expected: Amount,
@ -465,13 +551,16 @@ mod tests {
#[tokio::test]
async fn given_exact_confirmations_does_not_fetch_tx_again() {
let client = Mutex::new(DummyClient::new(vec![Ok(CheckTxKey {
confirmations: 10,
received: 100,
})]));
let wallet = Arc::new(Mutex::new(Wallet::from_dummy(
DummyClient::new(vec![Ok(CheckTxKey {
confirmations: 10,
received: 100,
})]),
Network::Testnet,
)));
let result = wait_for_confirmations(
&client,
wallet.clone(),
TransferProof::new(TxHash("<FOO>".to_owned()), PrivateKey {
scalar: crate::monero::Scalar::random(&mut rand::thread_rng())
}),
@ -485,9 +574,10 @@ mod tests {
assert!(result.is_ok());
assert_eq!(
client
wallet
.lock()
.await
.inner
.check_tx_key_invocations
.load(Ordering::SeqCst),
1
@ -498,31 +588,34 @@ mod tests {
async fn visual_log_check() {
let writer = capture_logs(LevelFilter::INFO);
let client = Mutex::new(DummyClient::new(vec![
Ok(CheckTxKey {
confirmations: 1,
received: 100,
}),
Ok(CheckTxKey {
confirmations: 1,
received: 100,
}),
Ok(CheckTxKey {
confirmations: 1,
received: 100,
}),
Ok(CheckTxKey {
confirmations: 3,
received: 100,
}),
Ok(CheckTxKey {
confirmations: 5,
received: 100,
}),
]));
let client = Arc::new(Mutex::new(Wallet::from_dummy(
DummyClient::new(vec![
Ok(CheckTxKey {
confirmations: 1,
received: 100,
}),
Ok(CheckTxKey {
confirmations: 1,
received: 100,
}),
Ok(CheckTxKey {
confirmations: 1,
received: 100,
}),
Ok(CheckTxKey {
confirmations: 3,
received: 100,
}),
Ok(CheckTxKey {
confirmations: 5,
received: 100,
}),
]),
Network::Testnet,
)));
wait_for_confirmations(
&client,
client.clone(),
TransferProof::new(TxHash("<FOO>".to_owned()), PrivateKey {
scalar: crate::monero::Scalar::random(&mut rand::thread_rng())
}),
@ -548,28 +641,31 @@ mod tests {
async fn reopens_wallet_in_case_not_available() {
let writer = capture_logs(LevelFilter::DEBUG);
let client = Mutex::new(DummyClient::new(vec![
Ok(CheckTxKey {
confirmations: 1,
received: 100,
}),
Ok(CheckTxKey {
confirmations: 1,
received: 100,
}),
Err((-13, "No wallet file".to_owned())),
Ok(CheckTxKey {
confirmations: 3,
received: 100,
}),
Ok(CheckTxKey {
confirmations: 5,
received: 100,
}),
]));
let client = Arc::new(Mutex::new(Wallet::from_dummy(
DummyClient::new(vec![
Ok(CheckTxKey {
confirmations: 1,
received: 100,
}),
Ok(CheckTxKey {
confirmations: 1,
received: 100,
}),
Err((-13, "No wallet file".to_owned())),
Ok(CheckTxKey {
confirmations: 3,
received: 100,
}),
Ok(CheckTxKey {
confirmations: 5,
received: 100,
}),
]),
Network::Testnet,
)));
wait_for_confirmations(
&client,
tokio::time::timeout(Duration::from_secs(30), wait_for_confirmations(
client.clone(),
TransferProof::new(TxHash("<FOO>".to_owned()), PrivateKey {
scalar: crate::monero::Scalar::random(&mut rand::thread_rng())
}),
@ -578,8 +674,9 @@ mod tests {
5,
tokio::time::interval(Duration::from_millis(10)),
"foo-wallet".to_owned(),
)
))
.await
.expect("timeout: shouldn't take more than 10 seconds")
.unwrap();
assert_eq!(
@ -594,6 +691,7 @@ DEBUG swap::monero::wallet: No wallet loaded. Opening wallet `foo-wallet` to con
client
.lock()
.await
.inner
.open_wallet_invocations
.load(Ordering::SeqCst),
1

View file

@ -4,6 +4,7 @@ use crate::env::Config;
use crate::protocol::Database;
use crate::{asb, bitcoin, monero};
use std::sync::Arc;
use tokio::sync::Mutex;
use uuid::Uuid;
pub use self::state::*;
@ -16,7 +17,7 @@ pub struct Swap {
pub state: AliceState,
pub event_loop_handle: asb::EventLoopHandle,
pub bitcoin_wallet: Arc<bitcoin::Wallet>,
pub monero_wallet: Arc<monero::Wallet>,
pub monero_wallet: Arc<Mutex<monero::Wallet>>,
pub env_config: Config,
pub swap_id: Uuid,
pub db: Arc<dyn Database + Send + Sync>,

View file

@ -3,7 +3,7 @@ use crate::bitcoin::{
TxPunish, TxRedeem, TxRefund, Txid,
};
use crate::env::Config;
use crate::monero::wallet::{TransferRequest, WatchRequest};
use crate::monero::wallet::{watch_for_transfer, TransferRequest, WatchRequest};
use crate::monero::TransferProof;
use crate::monero_ext::ScalarExt;
use crate::protocol::{Message0, Message1, Message2, Message3, Message4, CROSS_CURVE_PROOF_SYSTEM};
@ -14,6 +14,8 @@ use rand::{CryptoRng, RngCore};
use serde::{Deserialize, Serialize};
use sigma_fun::ext::dl_secp256k1_ed25519_eq::CrossCurveDLEQProof;
use std::fmt;
use std::sync::Arc;
use tokio::sync::Mutex;
use uuid::Uuid;
#[derive(Debug, Clone, PartialEq)]
@ -507,7 +509,7 @@ impl State3 {
pub async fn refund_xmr(
&self,
monero_wallet: &monero::Wallet,
monero_wallet: Arc<Mutex<monero::Wallet>>,
monero_wallet_restore_blockheight: BlockHeight,
file_name: String,
spend_key: monero::PrivateKey,
@ -516,12 +518,19 @@ impl State3 {
let view_key = self.v;
// Ensure that the XMR to be refunded are spendable by awaiting 10 confirmations
// on the lock transaction
monero_wallet
.watch_for_transfer(self.lock_xmr_watch_request(transfer_proof, 10))
.await?;
// on the lock transaction.
// We pass Mutex<Wallet> instead of a &mut Wallet to
// enable releasing the lock and avoid starving other tasks while waiting
// for the confirmations.
watch_for_transfer(
monero_wallet.clone(),
self.lock_xmr_watch_request(transfer_proof, 10),
)
.await?;
monero_wallet
.lock()
.await
.create_from_keys_and_sweep(
file_name,
spend_key,

View file

@ -1,5 +1,6 @@
//! Run an XMR/BTC swap in the role of Alice.
//! Alice holds XMR and wishes receive BTC.
use std::sync::Arc;
use std::time::Duration;
use crate::asb::{EventLoopHandle, LatestRate};
@ -10,6 +11,7 @@ use crate::{bitcoin, monero};
use ::bitcoin::consensus::encode::serialize_hex;
use anyhow::{bail, Context, Result};
use tokio::select;
use tokio::sync::Mutex;
use tokio::time::timeout;
use uuid::Uuid;
@ -37,7 +39,7 @@ where
current_state,
&mut swap.event_loop_handle,
swap.bitcoin_wallet.as_ref(),
swap.monero_wallet.as_ref(),
swap.monero_wallet.clone(),
&swap.env_config,
rate_service.clone(),
)
@ -56,7 +58,7 @@ async fn next_state<LR>(
state: AliceState,
event_loop_handle: &mut EventLoopHandle,
bitcoin_wallet: &bitcoin::Wallet,
monero_wallet: &monero::Wallet,
monero_wallet: Arc<Mutex<monero::Wallet>>,
env_config: &Config,
mut rate_service: LR,
) -> Result<AliceState>
@ -135,6 +137,7 @@ where
// Record the current monero wallet block height so we don't have to scan from
// block 0 for scenarios where we create a refund wallet.
let monero_wallet_restore_blockheight = monero_wallet
.lock().await
.block_height()
.await
.context("Failed to get Monero wallet block height")
@ -142,6 +145,7 @@ where
// Lock the Monero
monero_wallet
.lock().await
.transfer(state3.lock_xmr_transfer_request())
.await
.map(|proof| Some((monero_wallet_restore_blockheight, proof)))
@ -184,15 +188,17 @@ where
state3,
} => match state3.expired_timelocks(bitcoin_wallet).await? {
ExpiredTimelocks::None { .. } => {
monero_wallet
.watch_for_transfer(state3.lock_xmr_watch_request(transfer_proof.clone(), 1))
.await
.with_context(|| {
format!(
"Failed to watch for transfer of XMR in transaction {}",
transfer_proof.tx_hash()
)
})?;
monero::wallet::watch_for_transfer(
monero_wallet.clone(),
state3.lock_xmr_watch_request(transfer_proof.clone(), 1),
)
.await
.with_context(|| {
format!(
"Failed to watch for transfer of XMR in transaction {}",
transfer_proof.tx_hash()
)
})?;
AliceState::XmrLocked {
monero_wallet_restore_blockheight,
@ -438,7 +444,7 @@ where
|| async {
state3
.refund_xmr(
monero_wallet,
monero_wallet.clone(),
monero_wallet_restore_blockheight,
swap_id.to_string(),
spend_key,

View file

@ -1,6 +1,7 @@
use std::sync::Arc;
use anyhow::Result;
use tokio::sync::Mutex;
use uuid::Uuid;
use crate::cli::api::tauri_bindings::TauriHandle;
@ -19,7 +20,7 @@ pub struct Swap {
pub event_loop_handle: cli::EventLoopHandle,
pub db: Arc<dyn Database + Send + Sync>,
pub bitcoin_wallet: Arc<bitcoin::Wallet>,
pub monero_wallet: Arc<monero::Wallet>,
pub monero_wallet: Arc<Mutex<monero::Wallet>>,
pub env_config: env::Config,
pub id: Uuid,
pub monero_receive_address: monero::Address,
@ -32,7 +33,7 @@ impl Swap {
db: Arc<dyn Database + Send + Sync>,
id: Uuid,
bitcoin_wallet: Arc<bitcoin::Wallet>,
monero_wallet: Arc<monero::Wallet>,
monero_wallet: Arc<Mutex<monero::Wallet>>,
env_config: env::Config,
event_loop_handle: cli::EventLoopHandle,
monero_receive_address: monero::Address,
@ -60,7 +61,7 @@ impl Swap {
db: Arc<dyn Database + Send + Sync>,
id: Uuid,
bitcoin_wallet: Arc<bitcoin::Wallet>,
monero_wallet: Arc<monero::Wallet>,
monero_wallet: Arc<Mutex<monero::Wallet>>,
env_config: env::Config,
event_loop_handle: cli::EventLoopHandle,
monero_receive_address: monero::Address,

View file

@ -652,6 +652,7 @@ impl State5 {
pub fn tx_lock_id(&self) -> bitcoin::Txid {
self.tx_lock.txid()
}
pub async fn redeem_xmr(
&self,
monero_wallet: &monero::Wallet,
@ -661,34 +662,17 @@ impl State5 {
let (spend_key, view_key) = self.xmr_keys();
tracing::info!(%wallet_file_name, "Generating and opening Monero wallet from the extracted keys to redeem the Monero");
if let Err(e) = monero_wallet
.create_from_and_load(
let tx_hashes = monero_wallet
.create_from_keys_and_sweep_to(
wallet_file_name.clone(),
spend_key,
view_key,
self.monero_wallet_restore_blockheight,
monero_receive_address,
)
.await
{
// In case we failed to refresh/sweep, when resuming the wallet might already
// exist! This is a very unlikely scenario, but if we don't take care of it we
// might not be able to ever transfer the Monero.
tracing::warn!("Failed to generate monero wallet from keys: {:#}", e);
tracing::info!(%wallet_file_name,
"Falling back to trying to open the wallet if it already exists",
);
monero_wallet.open(wallet_file_name).await?;
}
// Ensure that the generated wallet is synced so we have a proper balance
monero_wallet.refresh(20).await?;
// Sweep (transfer all funds) to the Bobs Monero redeem address
let tx_hashes = monero_wallet.sweep_all(monero_receive_address).await?;
for tx_hash in &tx_hashes {
tracing::info!(%monero_receive_address, txid=%tx_hash.0, "Successfully transferred XMR to wallet");
}
.context("Failed to redeem Monero")?;
Ok(tx_hashes)
}

View file

@ -13,6 +13,7 @@ use crate::{bitcoin, monero};
use anyhow::{bail, Context as AnyContext, Result};
use std::sync::Arc;
use tokio::select;
use tokio::sync::Mutex;
use uuid::Uuid;
const PRE_BTC_LOCK_APPROVAL_TIMEOUT_SECS: u64 = 120;
@ -69,7 +70,7 @@ pub async fn run_until(
&mut swap.event_loop_handle,
swap.db.clone(),
swap.bitcoin_wallet.as_ref(),
swap.monero_wallet.as_ref(),
swap.monero_wallet.clone(),
swap.monero_receive_address,
swap.event_emitter.clone(),
)
@ -96,7 +97,7 @@ async fn next_state(
event_loop_handle: &mut EventLoopHandle,
db: Arc<dyn Database + Send + Sync>,
bitcoin_wallet: &bitcoin::Wallet,
monero_wallet: &monero::Wallet,
monero_wallet: Arc<Mutex<monero::Wallet>>,
monero_receive_address: monero::Address,
event_emitter: Option<TauriHandle>,
) -> Result<BobState> {
@ -148,7 +149,8 @@ async fn next_state(
// If the Monero transaction gets confirmed before Bob comes online again then
// Bob would record a wallet-height that is past the lock transaction height,
// which can lead to the wallet not detect the transaction.
let monero_wallet_restore_blockheight = monero_wallet.block_height().await?;
let monero_wallet_restore_blockheight =
monero_wallet.lock().await.block_height().await?;
let xmr_receive_amount = state2.xmr;
@ -324,7 +326,8 @@ async fn next_state(
let watch_request = state.lock_xmr_watch_request(lock_transfer_proof);
// We pass a listener to the function that get's called everytime a new confirmation is spotted.
let watch_future = monero_wallet.watch_for_transfer_with(
let watch_future = monero::wallet::watch_for_transfer_with(
monero_wallet.clone(),
watch_request,
Some(Box::new(move |confirmations| {
// Clone them again so that we can move them again
@ -442,7 +445,11 @@ async fn next_state(
event_emitter.emit_swap_progress_event(swap_id, TauriSwapProgressEvent::BtcRedeemed);
let xmr_redeem_txids = state
.redeem_xmr(monero_wallet, swap_id.to_string(), monero_receive_address)
.redeem_xmr(
&*monero_wallet.lock().await,
swap_id.to_string(),
monero_receive_address,
)
.await?;
event_emitter.emit_swap_progress_event(
@ -542,7 +549,11 @@ async fn next_state(
let state5 = state.attempt_cooperative_redeem(s_a);
match state5
.redeem_xmr(monero_wallet, swap_id.to_string(), monero_receive_address)
.redeem_xmr(
&*monero_wallet.lock().await,
swap_id.to_string(),
monero_receive_address,
)
.await
{
Ok(xmr_redeem_txids) => {

View file

@ -32,6 +32,7 @@ use testcontainers::clients::Cli;
use testcontainers::{Container, RunnableImage};
use tokio::sync::mpsc;
use tokio::sync::mpsc::Receiver;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tokio::time::{interval, timeout};
use tracing_subscriber::util::SubscriberInitExt;
@ -224,7 +225,7 @@ async fn start_alice(
listen_address: Multiaddr,
env_config: Config,
bitcoin_wallet: Arc<bitcoin::Wallet>,
monero_wallet: Arc<monero::Wallet>,
monero_wallet: Arc<Mutex<monero::Wallet>>,
) -> (AliceApplicationHandle, Receiver<alice::Swap>) {
if let Some(parent_dir) = db_path.parent() {
ensure_directory_exists(parent_dir).unwrap();
@ -288,7 +289,7 @@ async fn init_test_wallets(
electrum_rpc_port: u16,
seed: &Seed,
env_config: Config,
) -> (Arc<bitcoin::Wallet>, Arc<monero::Wallet>) {
) -> (Arc<bitcoin::Wallet>, Arc<Mutex<monero::Wallet>>) {
monero
.init_wallet(
name,
@ -355,7 +356,7 @@ async fn init_test_wallets(
}
}
(Arc::new(btc_wallet), Arc::new(xmr_wallet))
(Arc::new(btc_wallet), Arc::new(Mutex::new(xmr_wallet)))
}
const MONERO_WALLET_NAME_BOB: &str = "bob";
@ -412,7 +413,7 @@ pub struct BobParams {
seed: Seed,
db_path: PathBuf,
bitcoin_wallet: Arc<bitcoin::Wallet>,
monero_wallet: Arc<monero::Wallet>,
monero_wallet: Arc<Mutex<monero::Wallet>>,
alice_address: Multiaddr,
alice_peer_id: PeerId,
env_config: Config,
@ -430,7 +431,7 @@ impl BobParams {
pub async fn get_change_receive_addresses(&self) -> (bitcoin::Address, monero::Address) {
(
self.bitcoin_wallet.new_address().await.unwrap(),
self.monero_wallet.get_main_address(),
self.monero_wallet.lock().await.get_main_address(),
)
}
@ -452,7 +453,7 @@ impl BobParams {
self.monero_wallet.clone(),
self.env_config,
handle,
self.monero_wallet.get_main_address(),
self.monero_wallet.lock().await.get_main_address(),
)
.await?;
@ -484,7 +485,7 @@ impl BobParams {
self.monero_wallet.clone(),
self.env_config,
handle,
self.monero_wallet.get_main_address(),
self.monero_wallet.lock().await.get_main_address(),
self.bitcoin_wallet.new_address().await?,
btc_amount,
);
@ -543,14 +544,14 @@ pub struct TestContext {
alice_starting_balances: StartingBalances,
alice_bitcoin_wallet: Arc<bitcoin::Wallet>,
alice_monero_wallet: Arc<monero::Wallet>,
alice_monero_wallet: Arc<Mutex<monero::Wallet>>,
alice_swap_handle: mpsc::Receiver<Swap>,
alice_handle: AliceApplicationHandle,
pub bob_params: BobParams,
bob_starting_balances: StartingBalances,
bob_bitcoin_wallet: Arc<bitcoin::Wallet>,
bob_monero_wallet: Arc<monero::Wallet>,
bob_monero_wallet: Arc<Mutex<monero::Wallet>>,
}
impl TestContext {
@ -626,7 +627,7 @@ impl TestContext {
.unwrap();
assert_eventual_balance(
self.alice_monero_wallet.as_ref(),
&*self.alice_monero_wallet.lock().await,
Ordering::Less,
self.alice_redeemed_xmr_balance(),
)
@ -647,7 +648,7 @@ impl TestContext {
// Alice pays fees - comparison does not take exact lock fee into account
assert_eventual_balance(
self.alice_monero_wallet.as_ref(),
&*self.alice_monero_wallet.lock().await,
Ordering::Greater,
self.alice_refunded_xmr_balance(),
)
@ -667,7 +668,7 @@ impl TestContext {
.unwrap();
assert_eventual_balance(
self.alice_monero_wallet.as_ref(),
&*self.alice_monero_wallet.lock().await,
Ordering::Less,
self.alice_punished_xmr_balance(),
)
@ -685,10 +686,10 @@ impl TestContext {
.unwrap();
// unload the generated wallet by opening the original wallet
self.bob_monero_wallet.re_open().await.unwrap();
self.bob_monero_wallet.lock().await.re_open().await.unwrap();
assert_eventual_balance(
self.bob_monero_wallet.as_ref(),
&*self.bob_monero_wallet.lock().await,
Ordering::Greater,
self.bob_redeemed_xmr_balance(),
)
@ -729,7 +730,7 @@ impl TestContext {
assert!(bob_cancelled_and_refunded);
assert_eventual_balance(
self.bob_monero_wallet.as_ref(),
&*self.bob_monero_wallet.lock().await,
Ordering::Equal,
self.bob_refunded_xmr_balance(),
)
@ -747,7 +748,7 @@ impl TestContext {
.unwrap();
assert_eventual_balance(
self.bob_monero_wallet.as_ref(),
&*self.bob_monero_wallet.lock().await,
Ordering::Equal,
self.bob_punished_xmr_balance(),
)