initiating swaps in a separate task and handling shutdown signals with broadcast queues

This commit is contained in:
Lorenzo Tucci 2023-08-14 11:31:54 +02:00
commit 969c58e987
12 changed files with 353 additions and 283 deletions

View file

@ -1,4 +1,4 @@
use crate::api::request::{Method, Params, Request};
use crate::api::request::{Method, Request};
use crate::api::Context;
use crate::bitcoin::bitcoin_address;
use crate::monero::monero_address;
@ -18,19 +18,19 @@ pub fn register_modules(context: Arc<Context>) -> RpcModule<Arc<Context>> {
.register_async_method("get_bitcoin_balance", |_, context| async move {
get_bitcoin_balance(&context).await
})
.expect("Could not register RPC method get_bitcoin_balance");
.unwrap();
module
.register_async_method("get_history", |_, context| async move {
get_history(&context).await
})
.expect("Could not register RPC method get_history");
.unwrap();
module
.register_async_method("get_raw_history", |_, context| async move {
get_raw_history(&context).await
})
.expect("Could not register RPC method get_raw_history");
.unwrap();
module
.register_async_method("get_seller", |params, context| async move {
@ -42,7 +42,7 @@ pub fn register_modules(context: Arc<Context>) -> RpcModule<Arc<Context>> {
get_seller(*swap_id, &context).await
})
.expect("Could not register RPC method get_seller");
.unwrap();
module
.register_async_method("get_swap_start_date", |params, context| async move {
@ -54,7 +54,7 @@ pub fn register_modules(context: Arc<Context>) -> RpcModule<Arc<Context>> {
get_swap_start_date(*swap_id, &context).await
})
.expect("Could not register RPC method get_swap_start_date");
.unwrap();
module
.register_async_method("resume_swap", |params, context| async move {
@ -66,7 +66,18 @@ pub fn register_modules(context: Arc<Context>) -> RpcModule<Arc<Context>> {
resume_swap(*swap_id, &context).await
})
.expect("Could not register RPC method resume_swap");
.unwrap();
module.register_async_method("get_swap_expired_timelock", |params, context| async move {
let params: HashMap<String, Uuid> = params.parse()?;
let swap_id = params.get("swap_id").ok_or_else(|| {
jsonrpsee_core::Error::Custom("Does not contain swap_id".to_string())
})?;
get_swap_timelock(*swap_id, &context).await
}).unwrap();
module
.register_async_method("cancel_refund_swap", |params, context| async move {
let params: HashMap<String, Uuid> = params.parse()?;
@ -77,7 +88,7 @@ pub fn register_modules(context: Arc<Context>) -> RpcModule<Arc<Context>> {
cancel_and_refund_swap(*swap_id, &context).await
})
.expect("Could not register RPC method cancel_refund_swap");
.unwrap();
module
.register_async_method("withdraw_btc", |params, context| async move {
let params: HashMap<String, String> = params.parse()?;
@ -145,7 +156,7 @@ pub fn register_modules(context: Arc<Context>) -> RpcModule<Arc<Context>> {
)
.await
})
.expect("Could not register RPC method buy_xmr");
.unwrap();
module
.register_async_method("list_sellers", |params, context| async move {
let params: HashMap<String, Multiaddr> = params.parse()?;
@ -155,93 +166,99 @@ pub fn register_modules(context: Arc<Context>) -> RpcModule<Arc<Context>> {
list_sellers(rendezvous_point.clone(), &context).await
})
.expect("Could not register RPC method list_sellers");
.unwrap();
module.register_async_method("get_current_swap", |_, context| async move {
get_current_swap(&context).await
}).unwrap();
module
}
async fn execute_request(
cmd: Method,
params: Params,
context: &Arc<Context>,
) -> Result<serde_json::Value, jsonrpsee_core::Error> {
let mut request = Request::new(cmd, params);
let request = Request::new(cmd);
request
.call(Arc::clone(context))
.await
.map_err(|err| jsonrpsee_core::Error::Custom(err.to_string()))
}
async fn get_current_swap(context: &Arc<Context>) -> Result<serde_json::Value, jsonrpsee_core::Error> {
execute_request(Method::GetCurrentSwap, context).await
}
async fn get_bitcoin_balance(
context: &Arc<Context>,
) -> Result<serde_json::Value, jsonrpsee_core::Error> {
execute_request(Method::Balance, Params::default(), context).await
execute_request(Method::Balance, context).await
}
async fn get_history(context: &Arc<Context>) -> Result<serde_json::Value, jsonrpsee_core::Error> {
execute_request(Method::History, Params::default(), context).await
execute_request(Method::History, context).await
}
async fn get_raw_history(
context: &Arc<Context>,
) -> Result<serde_json::Value, jsonrpsee_core::Error> {
execute_request(Method::RawHistory, Params::default(), context).await
execute_request(Method::RawHistory, context).await
}
async fn get_seller(
swap_id: Uuid,
context: &Arc<Context>,
) -> Result<serde_json::Value, jsonrpsee_core::Error> {
let params = Params {
swap_id: Some(swap_id),
..Default::default()
};
execute_request(Method::GetSeller, params, context).await
execute_request(Method::GetSeller {
swap_id
}, context).await
}
async fn get_swap_start_date(
swap_id: Uuid,
context: &Arc<Context>,
) -> Result<serde_json::Value, jsonrpsee_core::Error> {
let params = Params {
swap_id: Some(swap_id),
..Default::default()
};
execute_request(Method::SwapStartDate, params, context).await
execute_request(Method::SwapStartDate {
swap_id
}, context).await
}
async fn resume_swap(
swap_id: Uuid,
context: &Arc<Context>,
) -> Result<serde_json::Value, jsonrpsee_core::Error> {
let params = Params {
swap_id: Some(swap_id),
..Default::default()
};
execute_request(Method::Resume, params, context).await
execute_request(Method::Resume {
swap_id
}, context).await
}
async fn get_swap_timelock(
swap_id: Uuid,
context: &Arc<Context>,
) -> Result<serde_json::Value, jsonrpsee_core::Error> {
execute_request(Method::GetSwapExpiredTimelock {
swap_id
}, context).await
}
async fn cancel_and_refund_swap(
swap_id: Uuid,
context: &Arc<Context>,
) -> Result<serde_json::Value, jsonrpsee_core::Error> {
let params = Params {
swap_id: Some(swap_id),
..Default::default()
};
execute_request(Method::CancelAndRefund, params, context).await
execute_request(Method::CancelAndRefund {
swap_id
}, context).await
}
async fn withdraw_btc(
withdraw_address: bitcoin::Address,
address: bitcoin::Address,
amount: Option<bitcoin::Amount>,
context: &Arc<Context>,
) -> Result<serde_json::Value, jsonrpsee_core::Error> {
let params = Params {
execute_request(Method::WithdrawBtc {
amount,
address: Some(withdraw_address),
..Default::default()
};
execute_request(Method::WithdrawBtc, params, context).await
address,
}, context).await
}
async fn buy_xmr(
@ -250,24 +267,19 @@ async fn buy_xmr(
seller: Multiaddr,
context: &Arc<Context>,
) -> Result<serde_json::Value, jsonrpsee_core::Error> {
let params = Params {
bitcoin_change_address: Some(bitcoin_change_address),
monero_receive_address: Some(monero_receive_address),
seller: Some(seller),
swap_id: Some(Uuid::new_v4()),
..Default::default()
};
execute_request(Method::BuyXmr, params, context).await
execute_request(Method::BuyXmr {
seller,
swap_id: Uuid::new_v4(),
bitcoin_change_address,
monero_receive_address
}, context).await
}
async fn list_sellers(
rendezvous_point: Multiaddr,
context: &Arc<Context>,
) -> Result<serde_json::Value, jsonrpsee_core::Error> {
let params = Params {
rendezvous_point: Some(rendezvous_point),
..Default::default()
};
execute_request(Method::ListSellers, params, context).await
execute_request(Method::ListSellers {
rendezvous_point
}, context).await
}