From aa74b1ff4c24bc2e43157b64b8181c3c55d6f392 Mon Sep 17 00:00:00 2001 From: binarybaron <86064887+binarybaron@users.noreply.github.com> Date: Mon, 28 Aug 2023 10:28:13 +0200 Subject: [PATCH] Wait for swap to be suspended after sending signal --- swap/src/api.rs | 34 +++++++++++++++++++++++++++++----- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/swap/src/api.rs b/swap/src/api.rs index 6dbf14f2..5e343115 100644 --- a/swap/src/api.rs +++ b/swap/src/api.rs @@ -12,7 +12,7 @@ use std::fmt; use std::net::SocketAddr; use std::path::PathBuf; use std::sync::{Arc, Once}; -use tokio::sync::{broadcast, broadcast::Receiver, broadcast::Sender, RwLock}; +use tokio::sync::{broadcast, broadcast::Sender, RwLock}; use url::Url; static START: Once = Once::new(); @@ -34,16 +34,14 @@ use uuid::Uuid; pub struct SwapLock { current_swap: RwLock>, - _suspension_rec: Receiver<()>, suspension_trigger: Sender<()>, } impl SwapLock { pub fn new() -> Self { - let (suspension_trigger, _suspension_rec) = broadcast::channel(10); + let (suspension_trigger, _) = broadcast::channel(10); SwapLock { current_swap: RwLock::new(None), - _suspension_rec, suspension_trigger, } } @@ -76,9 +74,35 @@ impl SwapLock { current_swap } + /// Sends a signal to suspend all ongoing swap processes. + /// + /// This function performs the following steps: + /// 1. Triggers the suspension by sending a unit `()` signal to all listeners via `self.suspension_trigger`. + /// 2. Polls the `current_swap` state every 50 milliseconds to check if it has been set to `None`, indicating that the swap processes have been suspended and the lock released. + /// 3. If the lock is not released within 10 seconds, the function returns an error. + /// + /// If we send a suspend signal while no swap is in progress, the function will not fail, but will return immediately. + /// + /// # Returns + /// - `Ok(())` if the swap lock is successfully released. + /// - `Err(Error)` if the function times out waiting for the swap lock to be released. + /// + /// # Notes + /// The 50ms polling interval is considered negligible overhead compared to the typical time required to suspend ongoing swap processes. pub async fn send_suspend_signal(&self) -> Result<(), Error> { + const TIMEOUT: u64 = 10_000; + const INTERVAL: u64 = 50; + let _ = self.suspension_trigger.send(())?; - Ok(()) + + for _ in 0..(TIMEOUT / INTERVAL) { + if self.get_current_swap_id().await.is_none() { + return Ok(()); + } + tokio::time::sleep(tokio::time::Duration::from_millis(INTERVAL)).await; + } + + bail!("Timed out waiting for swap lock to be released"); } pub async fn release_swap_lock(&self) -> Result {