diff --git a/swap/src/asb/event_loop.rs b/swap/src/asb/event_loop.rs index c6924808..f9d9c3db 100644 --- a/swap/src/asb/event_loop.rs +++ b/swap/src/asb/event_loop.rs @@ -23,7 +23,8 @@ use std::convert::{Infallible, TryInto}; use std::fmt::Debug; use std::sync::Arc; use std::time::Duration; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::{mpsc, oneshot, Mutex}; +use tokio::time::timeout; use uuid::Uuid; /// The time-to-live for quotes in the cache @@ -44,7 +45,7 @@ where swarm: libp2p::Swarm>, env_config: env::Config, bitcoin_wallet: Arc, - monero_wallet: Arc, + monero_wallet: Arc>, db: Arc, latest_rate: LR, min_buy: bitcoin::Amount, @@ -131,7 +132,7 @@ where swarm: Swarm>, env_config: env::Config, bitcoin_wallet: Arc, - monero_wallet: Arc, + monero_wallet: Arc>, db: Arc, latest_rate: LR, min_buy: bitcoin::Amount, @@ -231,7 +232,7 @@ where } }; - let wallet_snapshot = match WalletSnapshot::capture(&self.bitcoin_wallet, &self.monero_wallet, &self.external_redeem_address, btc).await { + let wallet_snapshot = match WalletSnapshot::capture(&self.bitcoin_wallet, &*self.monero_wallet.lock().await, &self.external_redeem_address, btc).await { Ok(wallet_snapshot) => wallet_snapshot, Err(error) => { tracing::error!("Swap request will be ignored because we were unable to create wallet snapshot for swap: {:#}", error); @@ -516,6 +517,11 @@ where min_buy: bitcoin::Amount, max_buy: bitcoin::Amount, ) -> Result, Arc> { + /// This is how long we maximally wait for the wallet lock + /// -- else the quote will be out of date and we will return + /// an error. + const MAX_WAIT_DURATION: Duration = Duration::from_secs(60); + let ask_price = self .latest_rate .latest_rate() @@ -523,8 +529,9 @@ where .ask() .map_err(|e| Arc::new(e.context("Failed to compute asking price")))?; - let balance = self - .monero_wallet + let balance = timeout(MAX_WAIT_DURATION, self.monero_wallet.lock()) + .await + .context("Timeout while waiting for lock on monero wallet while making quote")? .get_balance() .await .map_err(|e| Arc::new(e.context("Failed to get Monero balance")))?; diff --git a/swap/src/asb/recovery/refund.rs b/swap/src/asb/recovery/refund.rs index 3067a8f6..162f2f6a 100644 --- a/swap/src/asb/recovery/refund.rs +++ b/swap/src/asb/recovery/refund.rs @@ -6,6 +6,7 @@ use anyhow::{bail, Result}; use libp2p::PeerId; use std::convert::TryInto; use std::sync::Arc; +use tokio::sync::Mutex; use uuid::Uuid; #[derive(Debug, thiserror::Error)] @@ -26,7 +27,7 @@ pub enum Error { pub async fn refund( swap_id: Uuid, bitcoin_wallet: Arc, - monero_wallet: Arc, + monero_wallet: Arc>, db: Arc, ) -> Result { let state = db.get_state(swap_id).await?.try_into()?; @@ -73,7 +74,7 @@ pub async fn refund( state3 .refund_xmr( - &monero_wallet, + monero_wallet.clone(), monero_wallet_restore_blockheight, swap_id.to_string(), spend_key, diff --git a/swap/src/bin/asb.rs b/swap/src/bin/asb.rs index 72a4e637..bc9335e1 100644 --- a/swap/src/bin/asb.rs +++ b/swap/src/bin/asb.rs @@ -122,11 +122,11 @@ pub async fn main() -> Result<()> { // Initialize Monero wallet let monero_wallet = init_monero_wallet(&config, env_config).await?; - let monero_address = monero_wallet.get_main_address(); + let monero_address = monero_wallet.lock().await.get_main_address(); tracing::info!(%monero_address, "Monero wallet address"); // Check Monero balance - let monero = monero_wallet.get_balance().await?; + let monero = monero_wallet.lock().await.get_balance().await?; match (monero.balance, monero.unlocked_balance) { (0, _) => { tracing::warn!( @@ -317,7 +317,7 @@ pub async fn main() -> Result<()> { } Command::Balance => { let monero_wallet = init_monero_wallet(&config, env_config).await?; - let monero_balance = monero_wallet.get_balance().await?; + let monero_balance = monero_wallet.lock().await.get_balance().await?; tracing::info!(%monero_balance); let bitcoin_wallet = init_bitcoin_wallet(&config, &seed, env_config).await?; @@ -419,7 +419,7 @@ async fn init_bitcoin_wallet( async fn init_monero_wallet( config: &Config, env_config: swap::env::Config, -) -> Result { +) -> Result> { tracing::debug!("Opening Monero wallet"); let wallet = monero::Wallet::open_or_create( config.monero.wallet_rpc_url.clone(), @@ -428,7 +428,7 @@ async fn init_monero_wallet( ) .await?; - Ok(wallet) + Ok(tokio::sync::Mutex::new(wallet)) } /// This struct is used to extract swap details from the database and print them in a table format diff --git a/swap/src/cli/api.rs b/swap/src/cli/api.rs index 5b364b39..8d765cb9 100644 --- a/swap/src/cli/api.rs +++ b/swap/src/cli/api.rs @@ -188,7 +188,7 @@ pub struct Context { pub tasks: Arc, tauri_handle: Option, bitcoin_wallet: Option>, - monero_wallet: Option>, + monero_wallet: Option>>, monero_rpc_process: Option>>, tor_client: Option>>, } @@ -387,7 +387,10 @@ impl ContextBuilder { ]), ); - Ok((Some(Arc::new(wlt)), Some(Arc::new(SyncMutex::new(prc))))) + Ok(( + Some(Arc::new(TokioMutex::new(wlt))), + Some(Arc::new(SyncMutex::new(prc))), + )) } None => Ok((None, None)), } @@ -482,7 +485,7 @@ impl Context { env_config: EnvConfig, db_path: PathBuf, bob_bitcoin_wallet: Arc, - bob_monero_wallet: Arc, + bob_monero_wallet: Arc>, ) -> Self { let config = Config::for_harness(seed, env_config); diff --git a/swap/src/cli/api/tauri_bindings.rs b/swap/src/cli/api/tauri_bindings.rs index fe313131..e76e5e01 100644 --- a/swap/src/cli/api/tauri_bindings.rs +++ b/swap/src/cli/api/tauri_bindings.rs @@ -133,7 +133,7 @@ impl TauriHandle { let request_id = Uuid::new_v4(); let now_secs = SystemTime::now() .duration_since(UNIX_EPOCH) - .expect("it is later than the begin of the unix epoch") + .expect("system time to be after unix epoch (1970-01-01)") .as_secs(); let expiration_ts = now_secs + timeout_secs; diff --git a/swap/src/monero/wallet.rs b/swap/src/monero/wallet.rs index 312611aa..4f19acd9 100644 --- a/swap/src/monero/wallet.rs +++ b/swap/src/monero/wallet.rs @@ -10,16 +10,26 @@ use std::future::Future; use std::ops::Div; use std::pin::Pin; use std::str::FromStr; +use std::sync::Arc; use std::time::Duration; use tokio::sync::Mutex; use tokio::time::Interval; use url::Url; +/// This is our connection to the monero blockchain which we use +/// all over the codebase, mostly as `Arc>`. +/// +/// It represents a connection to a monero-wallet-rpc daemon, +/// which can load a (single) wallet at a time. +/// This struct contains methods for opening, closing, creating +/// wallet and for sending funds from the loaded wallet. #[derive(Debug)] -pub struct Wallet { - inner: Mutex, +pub struct Wallet { + inner: C, network: Network, - name: String, + /// The file name of the main wallet (the first wallet loaded) + main_wallet: String, + /// The first address of the main wallet main_address: monero::Address, sync_interval: Duration, } @@ -31,7 +41,8 @@ impl Wallet { match client.open_wallet(name.clone()).await { Err(error) => { - tracing::debug!(%error, "Open wallet response error"); + tracing::debug!(%error, "Failed to open wallet, trying to create instead"); + client.create_wallet(name.clone(), "English".to_owned()).await.context( "Unable to create Monero wallet, please ensure that the monero-wallet-rpc is available", )?; @@ -50,32 +61,61 @@ impl Wallet { monero::Address::from_str(client.get_address(0).await?.address.as_str())?; Ok(Self { - inner: Mutex::new(client), + inner: client, network: env_config.monero_network, - name, + main_wallet: name, main_address, sync_interval: env_config.monero_sync_interval(), }) } - /// Re-open the wallet using the internally stored name. + /// This can be used to create dummy wallet for testing purposes. + /// Warning: filled with non-sense values, don't use for anything + /// but as a wrapper around your dummy client. + #[cfg(test)] + fn from_dummy + Sync>( + client: T, + network: Network, + ) -> Wallet { + // Here we make up some values just so we can use the wallet in tests + // Todo: verify this works + use curve25519_dalek::scalar::Scalar; + + let privkey = PrivateKey::from_scalar(Scalar::one()); + let pubkey = PublicKey::from_private_key(&privkey); + + Wallet { + inner: client, + network, + sync_interval: Duration::from_secs(100), + main_wallet: "foo".into(), + main_address: Address::standard(network, pubkey, pubkey), + } + } + + /// Re-open the internally stored wallet from it's file. pub async fn re_open(&self) -> Result<()> { - self.inner - .lock() + self.open(self.main_wallet.clone()) .await - .open_wallet(self.name.clone()) - .await?; + .context("Failed to re-open main wallet")?; + Ok(()) } + /// Open a monero wallet from a file. pub async fn open(&self, filename: String) -> Result<()> { - self.inner.lock().await.open_wallet(filename).await?; + self.inner + .open_wallet(filename) + .await + .context("Failed to open ")?; Ok(()) } /// Close the wallet and open (load) another wallet by generating it from /// keys. The generated wallet will remain loaded. - pub async fn create_from_and_load( + /// + /// If the wallet already exists, it will just be loaded instead. + pub async fn open_or_create_from_keys( &self, file_name: String, private_spend_key: PrivateKey, @@ -87,18 +127,18 @@ impl Wallet { let address = Address::standard(self.network, public_spend_key, public_view_key); - let wallet = self.inner.lock().await; - // Properly close the wallet before generating the other wallet to ensure that // it saves its state correctly - let _ = wallet + let _ = self + .inner .close_wallet() .await .context("Failed to close wallet")?; - let _ = wallet + let result = self + .inner .generate_from_keys( - file_name, + file_name.clone(), address.to_string(), private_spend_key.to_string(), PrivateKey::from(private_view_key).to_string(), @@ -106,25 +146,65 @@ impl Wallet { String::from(""), true, ) - .await - .context("Failed to generate new wallet from keys")?; + .await; - Ok(()) + // If we failed to create the wallet because it already exists, + // we just try to open it instead + match result { + Ok(_) => Ok(()), + Err(error) if error.to_string().contains("Wallet already exists") => { + tracing::debug!( + monero_wallet_name = &file_name, + "Cannot create wallet because it already exists, loading instead" + ); + + self.open(file_name) + .await + .context("Failed to create wallet from keys ('Wallet already exists'), subsequent attempt to open failed, too") + } + Err(error) => Err(error).context("Failed to create wallet from keys"), + } } - /// Close the wallet and open (load) another wallet by generating it from - /// keys. The generated wallet will be opened, all funds sweeped to the - /// main_address and then the wallet will be re-loaded using the internally - /// stored name. + /// A wrapper around [`create_from_keys_and_sweep_to`] that sweeps all funds to the + /// main address of the wallet. + /// For the ASB this is the main wallet. + /// For the CLI, I don't know which wallet it is. + /// + /// Returns the tx hashes of the sweep. pub async fn create_from_keys_and_sweep( &self, file_name: String, private_spend_key: PrivateKey, private_view_key: PrivateViewKey, restore_height: BlockHeight, - ) -> Result<()> { + ) -> Result> { + self.create_from_keys_and_sweep_to( + file_name, + private_spend_key, + private_view_key, + restore_height, + self.main_address, + ) + .await + } + + /// Close the wallet and open (load) another wallet by generating it from + /// keys. The generated wallet will be opened, all funds sweeped to the + /// specified destination address and then the original wallet will be re-loaded using the internally + /// stored name. + /// + /// Returns the tx hashes of the sweep. + pub async fn create_from_keys_and_sweep_to( + &self, + file_name: String, + private_spend_key: PrivateKey, + private_view_key: PrivateViewKey, + restore_height: BlockHeight, + destination_address: Address, + ) -> Result> { // Close the default wallet, generate the new wallet from the keys and load it - self.create_from_and_load( + self.open_or_create_from_keys( file_name, private_spend_key, private_view_key, @@ -133,48 +213,30 @@ impl Wallet { .await?; // Refresh the generated wallet - if let Err(error) = self.refresh(20).await { - return Err(anyhow::anyhow!(error) - .context("Failed to refresh generated wallet for sweeping to default wallet")); - } + self.refresh(20) + .await + .context("Failed to refresh generated wallet for sweeping to destination address")?; - // Sweep all the funds from the generated wallet to the default wallet + // Sweep all the funds from the generated wallet to the specified destination address let sweep_result = self - .inner - .lock() + .sweep_all(destination_address) .await - .sweep_all(self.main_address.to_string()) - .await; + .context("Failed to transfer Monero to destination address")?; - match sweep_result { - Ok(sweep_all) => { - for tx in sweep_all.tx_hash_list { - tracing::info!( - %tx, - monero_address = %self.main_address, - "Monero transferred back to default wallet"); - } - } - Err(error) => { - return Err( - anyhow::anyhow!(error).context("Failed to transfer Monero to default wallet") - ); - } + for tx in &sweep_result { + tracing::info!( + %tx, + monero_address = %destination_address, + "Monero transferred to destination address"); } - let _ = self - .inner - .lock() - .await - .open_wallet(self.name.clone()) - .await?; + self.re_open().await?; - Ok(()) + Ok(sweep_result) } + /// Transfer a specified amount of monero to a specified address. pub async fn transfer(&self, request: TransferRequest) -> Result { - let inner = self.inner.lock().await; - let TransferRequest { public_spend_key, public_view_key, @@ -184,7 +246,8 @@ impl Wallet { let destination_address = Address::standard(self.network, public_spend_key, public_view_key.into()); - let res = inner + let res = self + .inner .transfer_single(0, amount.as_piconero(), &destination_address.to_string()) .await?; @@ -202,60 +265,9 @@ impl Wallet { )) } - /// Wait until the specified transfer has been completed or failed. - pub async fn watch_for_transfer(&self, request: WatchRequest) -> Result<(), InsufficientFunds> { - self.watch_for_transfer_with(request, None).await - } - - /// Wait until the specified transfer has been completed or failed and listen to each new confirmation. - #[allow(clippy::too_many_arguments)] - pub async fn watch_for_transfer_with( - &self, - request: WatchRequest, - listener: Option, - ) -> Result<(), InsufficientFunds> { - let WatchRequest { - conf_target, - public_view_key, - public_spend_key, - transfer_proof, - expected, - } = request; - - let txid = transfer_proof.tx_hash(); - - tracing::info!( - %txid, - target_confirmations = %conf_target, - "Waiting for Monero transaction finality" - ); - - let address = Address::standard(self.network, public_spend_key, public_view_key.into()); - - let check_interval = tokio::time::interval(self.sync_interval.div(10)); - - wait_for_confirmations_with( - &self.inner, - transfer_proof, - address, - expected, - conf_target, - check_interval, - self.name.clone(), - listener, - ) - .await?; - - Ok(()) - } - + /// Send all funds from the currently loaded wallet to a specified address. pub async fn sweep_all(&self, address: Address) -> Result> { - let sweep_all = self - .inner - .lock() - .await - .sweep_all(address.to_string()) - .await?; + let sweep_all = self.inner.sweep_all(address.to_string()).await?; let tx_hashes = sweep_all.tx_hash_list.into_iter().map(TxHash).collect(); Ok(tx_hashes) @@ -263,11 +275,11 @@ impl Wallet { /// Get the balance of the primary account. pub async fn get_balance(&self) -> Result { - Ok(self.inner.lock().await.get_balance(0).await?) + Ok(self.inner.get_balance(0).await?) } pub async fn block_height(&self) -> Result { - Ok(self.inner.lock().await.get_height().await?) + Ok(self.inner.get_height().await?) } pub fn get_main_address(&self) -> Address { @@ -278,13 +290,13 @@ impl Wallet { const RETRY_INTERVAL: Duration = Duration::from_secs(1); for i in 1..=max_attempts { - tracing::info!(name = %self.name, attempt=i, "Syncing Monero wallet"); + tracing::info!(name = %self.main_wallet, attempt=i, "Syncing Monero wallet"); - let result = self.inner.lock().await.refresh().await; + let result = self.inner.refresh().await; match result { Ok(refreshed) => { - tracing::info!(name = %self.name, "Monero wallet synced"); + tracing::info!(name = %self.main_wallet, "Monero wallet synced"); return Ok(refreshed); } Err(error) => { @@ -293,15 +305,15 @@ impl Wallet { // We would not want to fail here if the height is not available // as it is not critical for the operation of the wallet. // We can just log a warning and continue. - let height = match self.inner.lock().await.get_height().await { + let height = match self.inner.get_height().await { Ok(height) => height.to_string(), Err(_) => { - tracing::warn!(name = %self.name, "Failed to fetch Monero wallet height during sync"); + tracing::warn!(name = %self.main_wallet, "Failed to fetch Monero wallet height during sync"); "unknown".to_string() } }; - tracing::warn!(attempt=i, %height, %attempts_left, name = %self.name, %error, "Failed to sync Monero wallet"); + tracing::warn!(attempt=i, %height, %attempts_left, name = %self.main_wallet, %error, "Failed to sync Monero wallet"); if attempts_left == 0 { return Err(error.into()); @@ -315,6 +327,61 @@ impl Wallet { } } +/// Wait until the specified transfer has been completed or failed. +pub async fn watch_for_transfer( + wallet: Arc>, + request: WatchRequest, +) -> Result<(), InsufficientFunds> { + watch_for_transfer_with(wallet, request, None).await +} + +/// Wait until the specified transfer has been completed or failed and listen to each new confirmation. +#[allow(clippy::too_many_arguments)] +pub async fn watch_for_transfer_with( + wallet: Arc>, + request: WatchRequest, + listener: Option, +) -> Result<(), InsufficientFunds> { + let WatchRequest { + conf_target, + public_view_key, + public_spend_key, + transfer_proof, + expected, + } = request; + + let txid = transfer_proof.tx_hash(); + + tracing::info!( + %txid, + target_confirmations = %conf_target, + "Waiting for Monero transaction finality" + ); + + let address = Address::standard( + wallet.lock().await.network, + public_spend_key, + public_view_key.into(), + ); + + let check_interval = tokio::time::interval(wallet.lock().await.sync_interval.div(10)); + let wallet_name = wallet.lock().await.main_wallet.clone(); + + wait_for_confirmations_with( + wallet.clone(), + transfer_proof, + address, + expected, + conf_target, + check_interval, + wallet_name, + listener, + ) + .await?; + + Ok(()) +} + #[derive(Debug)] pub struct TransferRequest { pub public_spend_key: PublicKey, @@ -331,6 +398,13 @@ pub struct WatchRequest { pub expected: Amount, } +/// This is a shorthand for the dynamic type we use to pass listeners to +/// i.e. the `wait_for_confirmations` function. It is basically +/// an `async fn` which takes a `u64` and returns nothing, but in dynamic. +/// +/// We use this to pass a listener that sends events to the tauri +/// frontend to show upates to the number of confirmations that +/// a tx has. type ConfirmationListener = Box Pin + Send + 'static>> + Send + 'static>; @@ -338,7 +412,7 @@ type ConfirmationListener = async fn wait_for_confirmations_with< C: monero_rpc::wallet::MoneroWalletRpc + Sync, >( - client: &Mutex, + wallet: Arc>>, transfer_proof: TransferProof, to_address: Address, expected: Amount, @@ -353,16 +427,21 @@ async fn wait_for_confirmations_with< check_interval.tick().await; // tick() at the beginning of the loop so every `continue` tick()s as well let txid = transfer_proof.tx_hash().to_string(); - let client = client.lock().await; - let tx = match client + // Make sure to drop the lock before matching on the result + // otherwise it will deadlock on the error code -13 case + let result = wallet + .lock() + .await + .inner .check_tx_key( txid.clone(), transfer_proof.tx_key.to_string(), to_address.to_string(), ) - .await - { + .await; + + let tx = match result { Ok(proof) => proof, Err(jsonrpc::Error::JsonRpc(jsonrpc::JsonRpcError { code: -1, @@ -381,7 +460,13 @@ async fn wait_for_confirmations_with< txid ); - if let Err(err) = client.open_wallet(wallet_name.clone()).await { + if let Err(err) = wallet + .lock() + .await + .inner + .open_wallet(wallet_name.clone()) + .await + { tracing::warn!( %err, "Failed to open wallet `{}` to continue monitoring of Monero transaction {}", @@ -437,12 +522,13 @@ mod tests { use crate::tracing_ext::capture_logs; use monero_rpc::wallet::CheckTxKey; use std::sync::atomic::{AtomicU32, Ordering}; + use tokio::sync::Mutex; use tracing::metadata::LevelFilter; async fn wait_for_confirmations< C: monero_rpc::wallet::MoneroWalletRpc + Sync, >( - client: &Mutex, + client: Arc>>, transfer_proof: TransferProof, to_address: Address, expected: Amount, @@ -465,13 +551,16 @@ mod tests { #[tokio::test] async fn given_exact_confirmations_does_not_fetch_tx_again() { - let client = Mutex::new(DummyClient::new(vec![Ok(CheckTxKey { - confirmations: 10, - received: 100, - })])); + let wallet = Arc::new(Mutex::new(Wallet::from_dummy( + DummyClient::new(vec![Ok(CheckTxKey { + confirmations: 10, + received: 100, + })]), + Network::Testnet, + ))); let result = wait_for_confirmations( - &client, + wallet.clone(), TransferProof::new(TxHash("".to_owned()), PrivateKey { scalar: crate::monero::Scalar::random(&mut rand::thread_rng()) }), @@ -485,9 +574,10 @@ mod tests { assert!(result.is_ok()); assert_eq!( - client + wallet .lock() .await + .inner .check_tx_key_invocations .load(Ordering::SeqCst), 1 @@ -498,31 +588,34 @@ mod tests { async fn visual_log_check() { let writer = capture_logs(LevelFilter::INFO); - let client = Mutex::new(DummyClient::new(vec![ - Ok(CheckTxKey { - confirmations: 1, - received: 100, - }), - Ok(CheckTxKey { - confirmations: 1, - received: 100, - }), - Ok(CheckTxKey { - confirmations: 1, - received: 100, - }), - Ok(CheckTxKey { - confirmations: 3, - received: 100, - }), - Ok(CheckTxKey { - confirmations: 5, - received: 100, - }), - ])); + let client = Arc::new(Mutex::new(Wallet::from_dummy( + DummyClient::new(vec![ + Ok(CheckTxKey { + confirmations: 1, + received: 100, + }), + Ok(CheckTxKey { + confirmations: 1, + received: 100, + }), + Ok(CheckTxKey { + confirmations: 1, + received: 100, + }), + Ok(CheckTxKey { + confirmations: 3, + received: 100, + }), + Ok(CheckTxKey { + confirmations: 5, + received: 100, + }), + ]), + Network::Testnet, + ))); wait_for_confirmations( - &client, + client.clone(), TransferProof::new(TxHash("".to_owned()), PrivateKey { scalar: crate::monero::Scalar::random(&mut rand::thread_rng()) }), @@ -548,28 +641,31 @@ mod tests { async fn reopens_wallet_in_case_not_available() { let writer = capture_logs(LevelFilter::DEBUG); - let client = Mutex::new(DummyClient::new(vec![ - Ok(CheckTxKey { - confirmations: 1, - received: 100, - }), - Ok(CheckTxKey { - confirmations: 1, - received: 100, - }), - Err((-13, "No wallet file".to_owned())), - Ok(CheckTxKey { - confirmations: 3, - received: 100, - }), - Ok(CheckTxKey { - confirmations: 5, - received: 100, - }), - ])); + let client = Arc::new(Mutex::new(Wallet::from_dummy( + DummyClient::new(vec![ + Ok(CheckTxKey { + confirmations: 1, + received: 100, + }), + Ok(CheckTxKey { + confirmations: 1, + received: 100, + }), + Err((-13, "No wallet file".to_owned())), + Ok(CheckTxKey { + confirmations: 3, + received: 100, + }), + Ok(CheckTxKey { + confirmations: 5, + received: 100, + }), + ]), + Network::Testnet, + ))); - wait_for_confirmations( - &client, + tokio::time::timeout(Duration::from_secs(30), wait_for_confirmations( + client.clone(), TransferProof::new(TxHash("".to_owned()), PrivateKey { scalar: crate::monero::Scalar::random(&mut rand::thread_rng()) }), @@ -578,8 +674,9 @@ mod tests { 5, tokio::time::interval(Duration::from_millis(10)), "foo-wallet".to_owned(), - ) + )) .await + .expect("timeout: shouldn't take more than 10 seconds") .unwrap(); assert_eq!( @@ -594,6 +691,7 @@ DEBUG swap::monero::wallet: No wallet loaded. Opening wallet `foo-wallet` to con client .lock() .await + .inner .open_wallet_invocations .load(Ordering::SeqCst), 1 diff --git a/swap/src/protocol/alice.rs b/swap/src/protocol/alice.rs index c6c21512..5b757aa2 100644 --- a/swap/src/protocol/alice.rs +++ b/swap/src/protocol/alice.rs @@ -4,6 +4,7 @@ use crate::env::Config; use crate::protocol::Database; use crate::{asb, bitcoin, monero}; use std::sync::Arc; +use tokio::sync::Mutex; use uuid::Uuid; pub use self::state::*; @@ -16,7 +17,7 @@ pub struct Swap { pub state: AliceState, pub event_loop_handle: asb::EventLoopHandle, pub bitcoin_wallet: Arc, - pub monero_wallet: Arc, + pub monero_wallet: Arc>, pub env_config: Config, pub swap_id: Uuid, pub db: Arc, diff --git a/swap/src/protocol/alice/state.rs b/swap/src/protocol/alice/state.rs index 74afe402..77406cf3 100644 --- a/swap/src/protocol/alice/state.rs +++ b/swap/src/protocol/alice/state.rs @@ -3,7 +3,7 @@ use crate::bitcoin::{ TxPunish, TxRedeem, TxRefund, Txid, }; use crate::env::Config; -use crate::monero::wallet::{TransferRequest, WatchRequest}; +use crate::monero::wallet::{watch_for_transfer, TransferRequest, WatchRequest}; use crate::monero::TransferProof; use crate::monero_ext::ScalarExt; use crate::protocol::{Message0, Message1, Message2, Message3, Message4, CROSS_CURVE_PROOF_SYSTEM}; @@ -14,6 +14,8 @@ use rand::{CryptoRng, RngCore}; use serde::{Deserialize, Serialize}; use sigma_fun::ext::dl_secp256k1_ed25519_eq::CrossCurveDLEQProof; use std::fmt; +use std::sync::Arc; +use tokio::sync::Mutex; use uuid::Uuid; #[derive(Debug, Clone, PartialEq)] @@ -507,7 +509,7 @@ impl State3 { pub async fn refund_xmr( &self, - monero_wallet: &monero::Wallet, + monero_wallet: Arc>, monero_wallet_restore_blockheight: BlockHeight, file_name: String, spend_key: monero::PrivateKey, @@ -516,12 +518,19 @@ impl State3 { let view_key = self.v; // Ensure that the XMR to be refunded are spendable by awaiting 10 confirmations - // on the lock transaction - monero_wallet - .watch_for_transfer(self.lock_xmr_watch_request(transfer_proof, 10)) - .await?; + // on the lock transaction. + // We pass Mutex instead of a &mut Wallet to + // enable releasing the lock and avoid starving other tasks while waiting + // for the confirmations. + watch_for_transfer( + monero_wallet.clone(), + self.lock_xmr_watch_request(transfer_proof, 10), + ) + .await?; monero_wallet + .lock() + .await .create_from_keys_and_sweep( file_name, spend_key, diff --git a/swap/src/protocol/alice/swap.rs b/swap/src/protocol/alice/swap.rs index 5813bbf3..a6cf5712 100644 --- a/swap/src/protocol/alice/swap.rs +++ b/swap/src/protocol/alice/swap.rs @@ -1,5 +1,6 @@ //! Run an XMR/BTC swap in the role of Alice. //! Alice holds XMR and wishes receive BTC. +use std::sync::Arc; use std::time::Duration; use crate::asb::{EventLoopHandle, LatestRate}; @@ -10,6 +11,7 @@ use crate::{bitcoin, monero}; use ::bitcoin::consensus::encode::serialize_hex; use anyhow::{bail, Context, Result}; use tokio::select; +use tokio::sync::Mutex; use tokio::time::timeout; use uuid::Uuid; @@ -37,7 +39,7 @@ where current_state, &mut swap.event_loop_handle, swap.bitcoin_wallet.as_ref(), - swap.monero_wallet.as_ref(), + swap.monero_wallet.clone(), &swap.env_config, rate_service.clone(), ) @@ -56,7 +58,7 @@ async fn next_state( state: AliceState, event_loop_handle: &mut EventLoopHandle, bitcoin_wallet: &bitcoin::Wallet, - monero_wallet: &monero::Wallet, + monero_wallet: Arc>, env_config: &Config, mut rate_service: LR, ) -> Result @@ -135,6 +137,7 @@ where // Record the current monero wallet block height so we don't have to scan from // block 0 for scenarios where we create a refund wallet. let monero_wallet_restore_blockheight = monero_wallet + .lock().await .block_height() .await .context("Failed to get Monero wallet block height") @@ -142,6 +145,7 @@ where // Lock the Monero monero_wallet + .lock().await .transfer(state3.lock_xmr_transfer_request()) .await .map(|proof| Some((monero_wallet_restore_blockheight, proof))) @@ -184,15 +188,17 @@ where state3, } => match state3.expired_timelocks(bitcoin_wallet).await? { ExpiredTimelocks::None { .. } => { - monero_wallet - .watch_for_transfer(state3.lock_xmr_watch_request(transfer_proof.clone(), 1)) - .await - .with_context(|| { - format!( - "Failed to watch for transfer of XMR in transaction {}", - transfer_proof.tx_hash() - ) - })?; + monero::wallet::watch_for_transfer( + monero_wallet.clone(), + state3.lock_xmr_watch_request(transfer_proof.clone(), 1), + ) + .await + .with_context(|| { + format!( + "Failed to watch for transfer of XMR in transaction {}", + transfer_proof.tx_hash() + ) + })?; AliceState::XmrLocked { monero_wallet_restore_blockheight, @@ -438,7 +444,7 @@ where || async { state3 .refund_xmr( - monero_wallet, + monero_wallet.clone(), monero_wallet_restore_blockheight, swap_id.to_string(), spend_key, diff --git a/swap/src/protocol/bob.rs b/swap/src/protocol/bob.rs index 97a20aa3..2f00b435 100644 --- a/swap/src/protocol/bob.rs +++ b/swap/src/protocol/bob.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use anyhow::Result; +use tokio::sync::Mutex; use uuid::Uuid; use crate::cli::api::tauri_bindings::TauriHandle; @@ -19,7 +20,7 @@ pub struct Swap { pub event_loop_handle: cli::EventLoopHandle, pub db: Arc, pub bitcoin_wallet: Arc, - pub monero_wallet: Arc, + pub monero_wallet: Arc>, pub env_config: env::Config, pub id: Uuid, pub monero_receive_address: monero::Address, @@ -32,7 +33,7 @@ impl Swap { db: Arc, id: Uuid, bitcoin_wallet: Arc, - monero_wallet: Arc, + monero_wallet: Arc>, env_config: env::Config, event_loop_handle: cli::EventLoopHandle, monero_receive_address: monero::Address, @@ -60,7 +61,7 @@ impl Swap { db: Arc, id: Uuid, bitcoin_wallet: Arc, - monero_wallet: Arc, + monero_wallet: Arc>, env_config: env::Config, event_loop_handle: cli::EventLoopHandle, monero_receive_address: monero::Address, diff --git a/swap/src/protocol/bob/state.rs b/swap/src/protocol/bob/state.rs index 1e6425a5..e2c1d111 100644 --- a/swap/src/protocol/bob/state.rs +++ b/swap/src/protocol/bob/state.rs @@ -652,6 +652,7 @@ impl State5 { pub fn tx_lock_id(&self) -> bitcoin::Txid { self.tx_lock.txid() } + pub async fn redeem_xmr( &self, monero_wallet: &monero::Wallet, @@ -661,34 +662,17 @@ impl State5 { let (spend_key, view_key) = self.xmr_keys(); tracing::info!(%wallet_file_name, "Generating and opening Monero wallet from the extracted keys to redeem the Monero"); - if let Err(e) = monero_wallet - .create_from_and_load( + + let tx_hashes = monero_wallet + .create_from_keys_and_sweep_to( wallet_file_name.clone(), spend_key, view_key, self.monero_wallet_restore_blockheight, + monero_receive_address, ) .await - { - // In case we failed to refresh/sweep, when resuming the wallet might already - // exist! This is a very unlikely scenario, but if we don't take care of it we - // might not be able to ever transfer the Monero. - tracing::warn!("Failed to generate monero wallet from keys: {:#}", e); - tracing::info!(%wallet_file_name, - "Falling back to trying to open the wallet if it already exists", - ); - monero_wallet.open(wallet_file_name).await?; - } - - // Ensure that the generated wallet is synced so we have a proper balance - monero_wallet.refresh(20).await?; - - // Sweep (transfer all funds) to the Bobs Monero redeem address - let tx_hashes = monero_wallet.sweep_all(monero_receive_address).await?; - - for tx_hash in &tx_hashes { - tracing::info!(%monero_receive_address, txid=%tx_hash.0, "Successfully transferred XMR to wallet"); - } + .context("Failed to redeem Monero")?; Ok(tx_hashes) } diff --git a/swap/src/protocol/bob/swap.rs b/swap/src/protocol/bob/swap.rs index d262533b..49103c7b 100644 --- a/swap/src/protocol/bob/swap.rs +++ b/swap/src/protocol/bob/swap.rs @@ -13,6 +13,7 @@ use crate::{bitcoin, monero}; use anyhow::{bail, Context as AnyContext, Result}; use std::sync::Arc; use tokio::select; +use tokio::sync::Mutex; use uuid::Uuid; const PRE_BTC_LOCK_APPROVAL_TIMEOUT_SECS: u64 = 120; @@ -69,7 +70,7 @@ pub async fn run_until( &mut swap.event_loop_handle, swap.db.clone(), swap.bitcoin_wallet.as_ref(), - swap.monero_wallet.as_ref(), + swap.monero_wallet.clone(), swap.monero_receive_address, swap.event_emitter.clone(), ) @@ -96,7 +97,7 @@ async fn next_state( event_loop_handle: &mut EventLoopHandle, db: Arc, bitcoin_wallet: &bitcoin::Wallet, - monero_wallet: &monero::Wallet, + monero_wallet: Arc>, monero_receive_address: monero::Address, event_emitter: Option, ) -> Result { @@ -148,7 +149,8 @@ async fn next_state( // If the Monero transaction gets confirmed before Bob comes online again then // Bob would record a wallet-height that is past the lock transaction height, // which can lead to the wallet not detect the transaction. - let monero_wallet_restore_blockheight = monero_wallet.block_height().await?; + let monero_wallet_restore_blockheight = + monero_wallet.lock().await.block_height().await?; let xmr_receive_amount = state2.xmr; @@ -324,7 +326,8 @@ async fn next_state( let watch_request = state.lock_xmr_watch_request(lock_transfer_proof); // We pass a listener to the function that get's called everytime a new confirmation is spotted. - let watch_future = monero_wallet.watch_for_transfer_with( + let watch_future = monero::wallet::watch_for_transfer_with( + monero_wallet.clone(), watch_request, Some(Box::new(move |confirmations| { // Clone them again so that we can move them again @@ -442,7 +445,11 @@ async fn next_state( event_emitter.emit_swap_progress_event(swap_id, TauriSwapProgressEvent::BtcRedeemed); let xmr_redeem_txids = state - .redeem_xmr(monero_wallet, swap_id.to_string(), monero_receive_address) + .redeem_xmr( + &*monero_wallet.lock().await, + swap_id.to_string(), + monero_receive_address, + ) .await?; event_emitter.emit_swap_progress_event( @@ -542,7 +549,11 @@ async fn next_state( let state5 = state.attempt_cooperative_redeem(s_a); match state5 - .redeem_xmr(monero_wallet, swap_id.to_string(), monero_receive_address) + .redeem_xmr( + &*monero_wallet.lock().await, + swap_id.to_string(), + monero_receive_address, + ) .await { Ok(xmr_redeem_txids) => { diff --git a/swap/tests/harness/mod.rs b/swap/tests/harness/mod.rs index c086e36a..2f961bf1 100644 --- a/swap/tests/harness/mod.rs +++ b/swap/tests/harness/mod.rs @@ -32,6 +32,7 @@ use testcontainers::clients::Cli; use testcontainers::{Container, RunnableImage}; use tokio::sync::mpsc; use tokio::sync::mpsc::Receiver; +use tokio::sync::Mutex; use tokio::task::JoinHandle; use tokio::time::{interval, timeout}; use tracing_subscriber::util::SubscriberInitExt; @@ -224,7 +225,7 @@ async fn start_alice( listen_address: Multiaddr, env_config: Config, bitcoin_wallet: Arc, - monero_wallet: Arc, + monero_wallet: Arc>, ) -> (AliceApplicationHandle, Receiver) { if let Some(parent_dir) = db_path.parent() { ensure_directory_exists(parent_dir).unwrap(); @@ -288,7 +289,7 @@ async fn init_test_wallets( electrum_rpc_port: u16, seed: &Seed, env_config: Config, -) -> (Arc, Arc) { +) -> (Arc, Arc>) { monero .init_wallet( name, @@ -355,7 +356,7 @@ async fn init_test_wallets( } } - (Arc::new(btc_wallet), Arc::new(xmr_wallet)) + (Arc::new(btc_wallet), Arc::new(Mutex::new(xmr_wallet))) } const MONERO_WALLET_NAME_BOB: &str = "bob"; @@ -412,7 +413,7 @@ pub struct BobParams { seed: Seed, db_path: PathBuf, bitcoin_wallet: Arc, - monero_wallet: Arc, + monero_wallet: Arc>, alice_address: Multiaddr, alice_peer_id: PeerId, env_config: Config, @@ -430,7 +431,7 @@ impl BobParams { pub async fn get_change_receive_addresses(&self) -> (bitcoin::Address, monero::Address) { ( self.bitcoin_wallet.new_address().await.unwrap(), - self.monero_wallet.get_main_address(), + self.monero_wallet.lock().await.get_main_address(), ) } @@ -452,7 +453,7 @@ impl BobParams { self.monero_wallet.clone(), self.env_config, handle, - self.monero_wallet.get_main_address(), + self.monero_wallet.lock().await.get_main_address(), ) .await?; @@ -484,7 +485,7 @@ impl BobParams { self.monero_wallet.clone(), self.env_config, handle, - self.monero_wallet.get_main_address(), + self.monero_wallet.lock().await.get_main_address(), self.bitcoin_wallet.new_address().await?, btc_amount, ); @@ -543,14 +544,14 @@ pub struct TestContext { alice_starting_balances: StartingBalances, alice_bitcoin_wallet: Arc, - alice_monero_wallet: Arc, + alice_monero_wallet: Arc>, alice_swap_handle: mpsc::Receiver, alice_handle: AliceApplicationHandle, pub bob_params: BobParams, bob_starting_balances: StartingBalances, bob_bitcoin_wallet: Arc, - bob_monero_wallet: Arc, + bob_monero_wallet: Arc>, } impl TestContext { @@ -626,7 +627,7 @@ impl TestContext { .unwrap(); assert_eventual_balance( - self.alice_monero_wallet.as_ref(), + &*self.alice_monero_wallet.lock().await, Ordering::Less, self.alice_redeemed_xmr_balance(), ) @@ -647,7 +648,7 @@ impl TestContext { // Alice pays fees - comparison does not take exact lock fee into account assert_eventual_balance( - self.alice_monero_wallet.as_ref(), + &*self.alice_monero_wallet.lock().await, Ordering::Greater, self.alice_refunded_xmr_balance(), ) @@ -667,7 +668,7 @@ impl TestContext { .unwrap(); assert_eventual_balance( - self.alice_monero_wallet.as_ref(), + &*self.alice_monero_wallet.lock().await, Ordering::Less, self.alice_punished_xmr_balance(), ) @@ -685,10 +686,10 @@ impl TestContext { .unwrap(); // unload the generated wallet by opening the original wallet - self.bob_monero_wallet.re_open().await.unwrap(); + self.bob_monero_wallet.lock().await.re_open().await.unwrap(); assert_eventual_balance( - self.bob_monero_wallet.as_ref(), + &*self.bob_monero_wallet.lock().await, Ordering::Greater, self.bob_redeemed_xmr_balance(), ) @@ -729,7 +730,7 @@ impl TestContext { assert!(bob_cancelled_and_refunded); assert_eventual_balance( - self.bob_monero_wallet.as_ref(), + &*self.bob_monero_wallet.lock().await, Ordering::Equal, self.bob_refunded_xmr_balance(), ) @@ -747,7 +748,7 @@ impl TestContext { .unwrap(); assert_eventual_balance( - self.bob_monero_wallet.as_ref(), + &*self.bob_monero_wallet.lock().await, Ordering::Equal, self.bob_punished_xmr_balance(), )