WIP: Struct for concurrent swaps manager

This commit is contained in:
binarybaron 2023-08-15 12:20:44 +02:00
parent ec65ea2b27
commit bbcfffab6d
12 changed files with 411 additions and 484 deletions

View file

@ -7,12 +7,12 @@ use crate::network::rendezvous::XmrBtcNamespace;
use crate::protocol::Database;
use crate::seed::Seed;
use crate::{bitcoin, cli, monero};
use anyhow::{Context as AnyContext, Result};
use anyhow::{bail, Context as AnyContext, Error, Result};
use std::fmt;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::{Arc, Once};
use tokio::sync::{broadcast, Mutex, broadcast::Receiver, broadcast::Sender};
use tokio::sync::{broadcast, broadcast::Receiver, broadcast::Sender, RwLock};
use url::Url;
static START: Once = Once::new();
@ -30,43 +30,72 @@ pub struct Config {
pub is_testnet: bool,
}
impl Shutdown {
pub fn new(listen: Receiver<()>) -> Shutdown {
let (notify, _) = broadcast::channel(16);
Shutdown {
shutdown: Mutex::new(false),
listen: Mutex::new(listen),
notify
use uuid::Uuid;
pub struct SwapLock {
current_swap: RwLock<Option<Uuid>>,
_suspension_rec: Receiver<()>,
suspension_trigger: Sender<()>,
}
impl SwapLock {
pub fn new() -> Self {
let (suspension_trigger, _suspension_rec) = broadcast::channel(10);
SwapLock {
current_swap: RwLock::new(None),
_suspension_rec,
suspension_trigger,
}
}
/// Receive the shutdown notice, waiting if necessary.
pub async fn recv(&self) {
// If the shutdown signal has already been received, then return
// immediately.
let mut guard_shutdown = self.shutdown.lock().await;
if *guard_shutdown {
return;
pub async fn listen_for_swap_force_suspension(&self) -> Result<(), Error> {
let mut listener = self.suspension_trigger.subscribe();
let event = listener.recv().await;
match event {
Ok(_) => Ok(()),
Err(e) => {
tracing::error!("Error receiving swap suspension signal: {}", e);
bail!(e)
}
}
}
pub async fn acquire_swap_lock(&self, swap_id: Uuid) -> Result<(), Error> {
let mut current_swap = self.current_swap.write().await;
if current_swap.is_some() {
bail!("There already exists an active swap lock");
}
let _ = self.listen.lock().await.recv().await;
tracing::debug!(swap_id = %swap_id, "Acquiring swap lock");
*current_swap = Some(swap_id);
Ok(())
}
// Remember that the signal has been received.
*guard_shutdown = true;
pub async fn get_current_swap_id(&self) -> Option<Uuid> {
let current_swap = self.current_swap.read().await.clone();
current_swap
}
// Send shutdown request to child tasks
pub async fn send_suspend_signal(&self) -> Result<(), Error> {
let _ = self.suspension_trigger.send(())?;
Ok(())
}
pub async fn release_swap_lock(&self) -> Result<Uuid, Error> {
let mut current_swap = self.current_swap.write().await;
if let Some(swap_id) = current_swap.as_ref() {
tracing::debug!(swap_id = %swap_id, "Releasing swap lock");
let prev_swap_id = swap_id.clone();
*current_swap = None;
drop(current_swap);
Ok(prev_swap_id)
} else {
bail!("There is no current swap lock to release");
}
}
}
#[derive(Debug)]
pub struct Shutdown {
shutdown: Mutex<bool>,
listen: Mutex<Receiver<()>>,
notify: Sender<()>,
}
// workaround for warning over monero_rpc_process which we must own but not read
#[allow(dead_code)]
pub struct Context {
@ -74,8 +103,8 @@ pub struct Context {
bitcoin_wallet: Option<Arc<bitcoin::Wallet>>,
monero_wallet: Option<Arc<monero::Wallet>>,
monero_rpc_process: Option<monero::WalletRpcProcess>,
swap_lock: Arc<SwapLock>,
pub config: Config,
pub shutdown: Arc<Shutdown>,
}
impl Context {
@ -88,7 +117,6 @@ impl Context {
debug: bool,
json: bool,
server_address: Option<SocketAddr>,
sender: broadcast::Sender<()>,
) -> Result<Context> {
let data_dir = data::data_dir_from(data, is_testnet)?;
let env_config = env_config_from(is_testnet);
@ -148,7 +176,7 @@ impl Context {
is_testnet,
data_dir,
},
shutdown: Arc::new(Shutdown::new(sender.subscribe())),
swap_lock: Arc::new(SwapLock::new()),
};
Ok(context)
@ -239,7 +267,7 @@ fn env_config_from(testnet: bool) -> EnvConfig {
#[cfg(test)]
pub mod api_test {
use super::*;
use crate::api::request::{Method, Params, Request};
use crate::api::request::{Method, Request};
use libp2p::Multiaddr;
use std::str::FromStr;