Use RwLock instead of Mutex to allow for parallel reads and add get_current_swap endpoint

This commit is contained in:
binarybaron 2023-08-11 15:29:59 +02:00
parent 849e6e7a14
commit ffbbe24010
2 changed files with 24 additions and 10 deletions

View file

@ -16,16 +16,16 @@ use std::cmp::min;
use std::convert::TryInto; use std::convert::TryInto;
use std::future::Future; use std::future::Future;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::{Arc};
use std::time::Duration; use std::time::Duration;
use structopt::lazy_static::lazy_static; use structopt::lazy_static::lazy_static;
use tokio::sync::broadcast::Receiver; use tokio::sync::broadcast::Receiver;
use tracing::{debug_span, Instrument}; use tracing::{debug_span, Instrument};
use uuid::Uuid; use uuid::Uuid;
use tokio::sync::Mutex; use tokio::sync::RwLock;
lazy_static! { lazy_static! {
static ref SWAP_MUTEX: Mutex<Option<Uuid>> = Mutex::new(None); static ref SWAP_LOCK: RwLock<Option<Uuid>> = RwLock::new(None);
} }
#[derive(PartialEq, Debug)] #[derive(PartialEq, Debug)]
@ -113,7 +113,8 @@ pub enum Method {
}, },
StartDaemon { StartDaemon {
server_address: Option<SocketAddr>, server_address: Option<SocketAddr>,
} },
GetCurrentSwap,
} }
impl Request { impl Request {
@ -556,7 +557,12 @@ impl Request {
Ok(json!({ Ok(json!({
"result": [] "result": []
})) }))
} },
Method::GetCurrentSwap => {
Ok(json!({
"swap_id": SWAP_LOCK.read().await.clone()
}))
},
} }
} }
@ -569,15 +575,16 @@ impl Request {
if let Some(swap_id) = self.has_lockable_swap_id() { if let Some(swap_id) = self.has_lockable_swap_id() {
println!("taking lock for swap_id: {}", swap_id); println!("taking lock for swap_id: {}", swap_id);
let mut guard = SWAP_MUTEX.try_lock().context("Another swap is already running")?; let mut guard = SWAP_LOCK.write().await;
if guard.is_some() { if let Some(running_swap_id) = guard.as_ref() {
bail!("Another swap is already running"); bail!("Another swap is already running: {}", running_swap_id);
} }
let _ = guard.insert(swap_id.clone()); let _ = guard.insert(swap_id.clone());
drop(guard);
let result = self.handle_cmd(context).instrument(call_span).await; let result = self.handle_cmd(context).instrument(call_span).await;
guard.take();
SWAP_LOCK.write().await.take();
println!("releasing lock for swap_id: {}", swap_id); println!("releasing lock for swap_id: {}", swap_id);

View file

@ -156,6 +156,9 @@ pub fn register_modules(context: Arc<Context>) -> RpcModule<Arc<Context>> {
list_sellers(rendezvous_point.clone(), &context).await list_sellers(rendezvous_point.clone(), &context).await
}) })
.expect("Could not register RPC method list_sellers"); .expect("Could not register RPC method list_sellers");
module.register_async_method("get_current_swap", |_, context| async move {
get_current_swap(&context).await
}).expect("Could not register RPC method get_current_swap");
module module
} }
@ -170,6 +173,10 @@ async fn execute_request(
.map_err(|err| jsonrpsee_core::Error::Custom(err.to_string())) .map_err(|err| jsonrpsee_core::Error::Custom(err.to_string()))
} }
async fn get_current_swap(context: &Arc<Context>) -> Result<serde_json::Value, jsonrpsee_core::Error> {
execute_request(Method::GetCurrentSwap, context).await
}
async fn get_bitcoin_balance( async fn get_bitcoin_balance(
context: &Arc<Context>, context: &Arc<Context>,
) -> Result<serde_json::Value, jsonrpsee_core::Error> { ) -> Result<serde_json::Value, jsonrpsee_core::Error> {