From 4de9917e68e8ac83b0b07d52d7e1905b77f8326f Mon Sep 17 00:00:00 2001 From: Binarybaron Date: Wed, 16 Apr 2025 12:55:29 +0200 Subject: [PATCH] make work --- swap/src/asb/event_loop.rs | 145 +++++++++++++++++++++++++++++++------ 1 file changed, 123 insertions(+), 22 deletions(-) diff --git a/swap/src/asb/event_loop.rs b/swap/src/asb/event_loop.rs index e880c870..5168a950 100644 --- a/swap/src/asb/event_loop.rs +++ b/swap/src/asb/event_loop.rs @@ -16,6 +16,7 @@ use futures::stream::{FuturesUnordered, StreamExt}; use libp2p::request_response::{OutboundFailure, OutboundRequestId, ResponseChannel}; use libp2p::swarm::SwarmEvent; use libp2p::{PeerId, Swarm}; +use moka::future::Cache; use rust_decimal::Decimal; use std::collections::HashMap; use std::convert::{Infallible, TryInto}; @@ -25,6 +26,11 @@ use std::time::Duration; use tokio::sync::{mpsc, oneshot}; use uuid::Uuid; +/// Simple unit struct to serve as a key for the quote cache. +/// Since all quotes are the same type currently, we can use a simple key. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +struct QuoteCacheKey; + #[allow(missing_debug_implementations)] pub struct EventLoop where @@ -39,6 +45,9 @@ where min_buy: bitcoin::Amount, max_buy: bitcoin::Amount, external_redeem_address: Option, + + /// Cache for quotes + quote_cache: Cache, Arc>>, swap_sender: mpsc::Sender, @@ -127,6 +136,12 @@ where let swap_channel = MpscChannels::default(); let (outgoing_transfer_proofs_sender, outgoing_transfer_proofs_requests) = tokio::sync::mpsc::unbounded_channel(); + + // --- Initialize moka::future::Cache --- + let quote_cache = Cache::builder() + .time_to_live(Duration::from_secs(120)) // 2 minutes TTL + .build(); // Builds a future::Cache + // --- End cache initialization --- let event_loop = EventLoop { swarm, @@ -139,6 +154,7 @@ where min_buy, max_buy, external_redeem_address, + quote_cache, recv_encrypted_signature: Default::default(), inflight_encrypted_signatures: Default::default(), outgoing_transfer_proofs_requests, @@ -232,17 +248,19 @@ where tracing::warn!(%peer, "Ignoring spot price request: {}", error); } SwarmEvent::Behaviour(OutEvent::QuoteRequested { channel, peer }) => { - let quote = match self.make_quote(self.min_buy, self.max_buy).await { - Ok(quote) => quote, + // --- Use the new caching function --- + match self.make_quote_or_use_cached().await { + Ok(quote) => { + if self.swarm.behaviour_mut().quote.send_response(channel, quote).is_err() { + tracing::debug!(%peer, "Failed to respond with quote"); + } + } Err(error) => { - tracing::warn!(%peer, "Failed to make quote: {:#}", error); + tracing::warn!(%peer, "Failed to make or retrieve quote: {:#}", error); continue; } - }; - - if self.swarm.behaviour_mut().quote.send_response(channel, quote).is_err() { - tracing::debug!(%peer, "Failed to respond with quote"); } + // --- End use of caching function --- } SwarmEvent::Behaviour(OutEvent::TransferProofAcknowledged { peer, id }) => { tracing::debug!(%peer, "Bob acknowledged transfer proof"); @@ -462,8 +480,95 @@ where } } + /// Get a quote from the cache or calculate a new one using moka::future::Cache + /// Stores the Result, Arc> in the cache. + async fn make_quote_or_use_cached(&self) -> Result { + let key = QuoteCacheKey; + + // Clone needed data for the async calculation block + let min_buy = self.min_buy; + let max_buy = self.max_buy; + let mut latest_rate = self.latest_rate.clone(); + let monero_wallet = self.monero_wallet.clone(); + + // get_with expects the future to return V, where V is our Result<..., Arc> + let cached_result: Result, Arc> = self.quote_cache.get_with(key, async move { + tracing::debug!("Cache miss or expired, calculating new quote result"); + + // Inner function to perform the calculation and return Result<..., anyhow::Error> + let calculation_result = async { + let ask_price = latest_rate + .latest_rate() + .context("Failed to get latest rate")? + .ask() + .context("Failed to compute asking price")?; + + let balance = monero_wallet.get_balance().await?; + let xmr_balance = Amount::from_piconero(balance.unlocked_balance); + + let max_bitcoin_for_monero = + xmr_balance + .max_bitcoin_for_price(ask_price) + .ok_or_else(|| { + anyhow!( + "Bitcoin price ({}) x Monero ({}) overflow", + ask_price, + xmr_balance + ) + })?; + + tracing::trace!(%ask_price, %xmr_balance, %max_bitcoin_for_monero, "Computed quote"); + + if min_buy > max_bitcoin_for_monero { + tracing::trace!( + "Your Monero balance is too low... Min: {}, Max possible: {}", + min_buy, max_bitcoin_for_monero + ); + return Ok(Arc::new(BidQuote { + price: ask_price, + min_quantity: bitcoin::Amount::ZERO, + max_quantity: bitcoin::Amount::ZERO, + })); + } + + if max_buy > max_bitcoin_for_monero { + tracing::trace!( + "Your Monero balance is too low... Max requested: {}, Max possible: {}", + max_buy, max_bitcoin_for_monero + ); + return Ok(Arc::new(BidQuote { + price: ask_price, + min_quantity: min_buy, + max_quantity: max_bitcoin_for_monero, + })); + } + + Ok(Arc::new(BidQuote { + price: ask_price, + min_quantity: min_buy, + max_quantity: max_buy, + })) + }.await; + + // Map the Result<..., anyhow::Error> to Result<..., Arc> for caching + calculation_result.map_err(Arc::new) + }).await; + + // The cached_result is the actual Result we stored. + // Now, convert it back to the expected return type Result + match cached_result { + Ok(bid_quote_arc) => Ok((*bid_quote_arc).clone()), // Clone the BidQuote out of the Arc + Err(error_arc) => { + // Clone the error message from the Arc + // We convert it back to a regular anyhow::Error + Err(anyhow::Error::msg(error_arc.to_string())) + } + } + } + + /// Original make_quote (potentially unused if all callers switch) async fn make_quote( - &mut self, + &mut self, // Note: might still need &mut if latest_rate() does min_buy: bitcoin::Amount, max_buy: bitcoin::Amount, ) -> Result { @@ -475,8 +580,6 @@ where .context("Failed to compute asking price")?; let balance = self.monero_wallet.get_balance().await?; - - // use unlocked monero balance for quote let xmr_balance = Amount::from_piconero(balance.unlocked_balance); let max_bitcoin_for_monero = @@ -493,11 +596,10 @@ where tracing::trace!(%ask_price, %xmr_balance, %max_bitcoin_for_monero, "Computed quote"); if min_buy > max_bitcoin_for_monero { - tracing::trace!( - "Your Monero balance is too low to initiate a swap, as your minimum swap amount is {}. You could at most swap {}", - min_buy, max_bitcoin_for_monero - ); - + tracing::trace!( + "Your Monero balance is too low... Min: {}, Max possible: {}", + min_buy, max_bitcoin_for_monero + ); return Ok(BidQuote { price: ask_price, min_quantity: bitcoin::Amount::ZERO, @@ -506,10 +608,10 @@ where } if max_buy > max_bitcoin_for_monero { - tracing::trace!( - "Your Monero balance is too low to initiate a swap with the maximum swap amount {} that you have specified in your config. You can at most swap {}", - max_buy, max_bitcoin_for_monero - ); + tracing::trace!( + "Your Monero balance is too low... Max requested: {}, Max possible: {}", + max_buy, max_bitcoin_for_monero + ); return Ok(BidQuote { price: ask_price, min_quantity: min_buy, @@ -524,12 +626,14 @@ where }) } + /// Removed cache invalidation logic from handle_execution_setup_done for now async fn handle_execution_setup_done( &mut self, bob_peer_id: PeerId, swap_id: Uuid, state3: State3, ) { + // Original logic without cache invalidation let handle = self.new_handle(bob_peer_id, swap_id); let initial_state = AliceState::Started { @@ -546,9 +650,6 @@ where swap_id, }; - // TODO: Consider adding separate components for start/resume of swaps - - // swaps save peer id so we can resume match self.db.insert_peer_id(swap_id, bob_peer_id).await { Ok(_) => { if let Err(error) = self.swap_sender.send(swap).await {