mirror of
https://github.com/comit-network/xmr-btc-swap.git
synced 2025-08-07 14:02:32 -04:00
initiating swaps in a separate task and handling shutdown signals with broadcast queues
This commit is contained in:
parent
63c1edbdd3
commit
6a9f72a857
5 changed files with 87 additions and 84 deletions
|
@ -12,7 +12,7 @@ use std::fmt;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::sync::{Arc, Once};
|
use std::sync::{Arc, Once};
|
||||||
use tokio::sync::{broadcast, Mutex};
|
use tokio::sync::{broadcast, Mutex, broadcast::Receiver, broadcast::Sender};
|
||||||
use url::Url;
|
use url::Url;
|
||||||
|
|
||||||
static START: Once = Once::new();
|
static START: Once = Once::new();
|
||||||
|
@ -30,6 +30,43 @@ pub struct Config {
|
||||||
pub is_testnet: bool,
|
pub is_testnet: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Shutdown {
|
||||||
|
pub fn new(listen: Receiver<()>) -> Shutdown {
|
||||||
|
let (notify, _) = broadcast::channel(16);
|
||||||
|
Shutdown {
|
||||||
|
shutdown: Mutex::new(false),
|
||||||
|
listen: Mutex::new(listen),
|
||||||
|
notify
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Receive the shutdown notice, waiting if necessary.
|
||||||
|
pub async fn recv(&self) {
|
||||||
|
// If the shutdown signal has already been received, then return
|
||||||
|
// immediately.
|
||||||
|
let mut guard_shutdown = self.shutdown.lock().await;
|
||||||
|
if *guard_shutdown {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let _ = self.listen.lock().await.recv().await;
|
||||||
|
|
||||||
|
// Remember that the signal has been received.
|
||||||
|
*guard_shutdown = true;
|
||||||
|
|
||||||
|
// Send shutdown request to child tasks
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Shutdown {
|
||||||
|
shutdown: Mutex<bool>,
|
||||||
|
listen: Mutex<Receiver<()>>,
|
||||||
|
notify: Sender<()>,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// workaround for warning over monero_rpc_process which we must own but not read
|
// workaround for warning over monero_rpc_process which we must own but not read
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub struct Context {
|
pub struct Context {
|
||||||
|
@ -37,9 +74,8 @@ pub struct Context {
|
||||||
bitcoin_wallet: Option<Arc<bitcoin::Wallet>>,
|
bitcoin_wallet: Option<Arc<bitcoin::Wallet>>,
|
||||||
monero_wallet: Option<Arc<monero::Wallet>>,
|
monero_wallet: Option<Arc<monero::Wallet>>,
|
||||||
monero_rpc_process: Option<monero::WalletRpcProcess>,
|
monero_rpc_process: Option<monero::WalletRpcProcess>,
|
||||||
running_swap: Arc<Mutex<bool>>,
|
|
||||||
pub config: Config,
|
pub config: Config,
|
||||||
pub shutdown: Arc<broadcast::Sender<()>>,
|
pub shutdown: Arc<Shutdown>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Context {
|
impl Context {
|
||||||
|
@ -52,7 +88,7 @@ impl Context {
|
||||||
debug: bool,
|
debug: bool,
|
||||||
json: bool,
|
json: bool,
|
||||||
server_address: Option<SocketAddr>,
|
server_address: Option<SocketAddr>,
|
||||||
shutdown: broadcast::Sender<()>,
|
sender: broadcast::Sender<()>,
|
||||||
) -> Result<Context> {
|
) -> Result<Context> {
|
||||||
let data_dir = data::data_dir_from(data, is_testnet)?;
|
let data_dir = data::data_dir_from(data, is_testnet)?;
|
||||||
let env_config = env_config_from(is_testnet);
|
let env_config = env_config_from(is_testnet);
|
||||||
|
@ -112,8 +148,7 @@ impl Context {
|
||||||
is_testnet,
|
is_testnet,
|
||||||
data_dir,
|
data_dir,
|
||||||
},
|
},
|
||||||
shutdown: Arc::new(shutdown),
|
shutdown: Arc::new(Shutdown::new(sender.subscribe())),
|
||||||
running_swap: Arc::new(Mutex::new(false)),
|
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(context)
|
Ok(context)
|
||||||
|
@ -265,7 +300,6 @@ pub mod api_test {
|
||||||
};
|
};
|
||||||
|
|
||||||
Request::new(
|
Request::new(
|
||||||
tx.subscribe(),
|
|
||||||
Method::BuyXmr,
|
Method::BuyXmr,
|
||||||
Params {
|
Params {
|
||||||
seller: Some(seller),
|
seller: Some(seller),
|
||||||
|
@ -277,9 +311,8 @@ pub mod api_test {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn resume(tx: broadcast::Sender<()>) -> Request {
|
pub fn resume() -> Request {
|
||||||
Request::new(
|
Request::new(
|
||||||
tx.subscribe(),
|
|
||||||
Method::Resume,
|
Method::Resume,
|
||||||
Params {
|
Params {
|
||||||
swap_id: Some(Uuid::from_str(SWAP_ID).unwrap()),
|
swap_id: Some(Uuid::from_str(SWAP_ID).unwrap()),
|
||||||
|
@ -288,9 +321,8 @@ pub mod api_test {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn cancel(tx: broadcast::Sender<()>) -> Request {
|
pub fn cancel() -> Request {
|
||||||
Request::new(
|
Request::new(
|
||||||
tx.subscribe(),
|
|
||||||
Method::CancelAndRefund,
|
Method::CancelAndRefund,
|
||||||
Params {
|
Params {
|
||||||
swap_id: Some(Uuid::from_str(SWAP_ID).unwrap()),
|
swap_id: Some(Uuid::from_str(SWAP_ID).unwrap()),
|
||||||
|
@ -299,9 +331,8 @@ pub mod api_test {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn refund(tx: broadcast::Sender<()>) -> Request {
|
pub fn refund() -> Request {
|
||||||
Request::new(
|
Request::new(
|
||||||
tx.subscribe(),
|
|
||||||
Method::CancelAndRefund,
|
Method::CancelAndRefund,
|
||||||
Params {
|
Params {
|
||||||
swap_id: Some(Uuid::from_str(SWAP_ID).unwrap()),
|
swap_id: Some(Uuid::from_str(SWAP_ID).unwrap()),
|
||||||
|
|
|
@ -18,59 +18,15 @@ use std::future::Future;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tokio::sync::broadcast::Receiver;
|
|
||||||
use tokio::sync::Mutex;
|
|
||||||
use tracing::{debug_span, Instrument};
|
use tracing::{debug_span, Instrument};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
#[derive(PartialEq, Debug)]
|
#[derive(PartialEq, Debug)]
|
||||||
pub struct Request {
|
pub struct Request {
|
||||||
pub params: Params,
|
pub params: Params,
|
||||||
pub cmd: Method,
|
pub cmd: Method
|
||||||
pub shutdown: Shutdown,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Shutdown {
|
|
||||||
pub fn new(notify: Receiver<()>) -> Shutdown {
|
|
||||||
Shutdown {
|
|
||||||
shutdown: false,
|
|
||||||
notify,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns `true` if the shutdown signal has been received.
|
|
||||||
pub fn is_shutdown(&self) -> bool {
|
|
||||||
self.shutdown
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Receive the shutdown notice, waiting if necessary.
|
|
||||||
pub async fn recv(&mut self) {
|
|
||||||
// If the shutdown signal has already been received, then return
|
|
||||||
// immediately.
|
|
||||||
if self.shutdown {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Cannot receive a "lag error" as only one value is ever sent.
|
|
||||||
let _ = self.notify.recv().await;
|
|
||||||
|
|
||||||
self.shutdown = true;
|
|
||||||
|
|
||||||
// Remember that the signal has been received.
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct Shutdown {
|
|
||||||
shutdown: bool,
|
|
||||||
notify: Receiver<()>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl PartialEq for Shutdown {
|
|
||||||
fn eq(&self, other: &Shutdown) -> bool {
|
|
||||||
self.shutdown == other.shutdown
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Default, PartialEq, Debug)]
|
#[derive(Default, PartialEq, Debug)]
|
||||||
pub struct Params {
|
pub struct Params {
|
||||||
|
@ -103,11 +59,10 @@ pub enum Method {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Request {
|
impl Request {
|
||||||
pub fn new(shutdownReceiver: Receiver<()>, cmd: Method, params: Params) -> Request {
|
pub fn new(cmd: Method, params: Params) -> Request {
|
||||||
Request {
|
Request {
|
||||||
params,
|
params,
|
||||||
cmd,
|
cmd
|
||||||
shutdown: Shutdown::new(shutdownReceiver),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -218,17 +173,37 @@ impl Request {
|
||||||
bitcoin_change_address,
|
bitcoin_change_address,
|
||||||
amount,
|
amount,
|
||||||
);
|
);
|
||||||
|
let mut halt = context.shutdown.notify.subscribe();
|
||||||
|
|
||||||
tokio::select! {
|
// execution will halt if the server daemon is stopped or a cancel running swap
|
||||||
result = event_loop => {
|
// request is sent
|
||||||
result
|
tokio::spawn(async move {
|
||||||
.context("EventLoop panicked")?;
|
tokio::select! {
|
||||||
},
|
result = event_loop => {
|
||||||
result = bob::run(swap) => {
|
match result {
|
||||||
result
|
Ok(_) => {
|
||||||
.context("Failed to complete swap")?;
|
tracing::debug!(%swap_id, "EventLoop completed")
|
||||||
|
}
|
||||||
|
Err(error) => {
|
||||||
|
tracing::error!(%swap_id, "EventLoop failed: {:#}", error)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
result = bob::run(swap) => {
|
||||||
|
match result {
|
||||||
|
Ok(state) => {
|
||||||
|
tracing::debug!(%swap_id, state=%state, "Swap completed")
|
||||||
|
}
|
||||||
|
Err(error) => {
|
||||||
|
tracing::error!(%swap_id, "Failed to complete swap: {:#}", error)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ = halt.recv() => {
|
||||||
|
tracing::debug!(%swap_id, "Swap cancel signal received while running swap")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
});
|
||||||
Ok(json!({
|
Ok(json!({
|
||||||
"empty": "true"
|
"empty": "true"
|
||||||
}))
|
}))
|
||||||
|
@ -240,11 +215,13 @@ impl Request {
|
||||||
let state: BobState = state.try_into()?;
|
let state: BobState = state.try_into()?;
|
||||||
vec.push((swap_id, state.to_string()));
|
vec.push((swap_id, state.to_string()));
|
||||||
}
|
}
|
||||||
|
context.shutdown.notify.send(())?;
|
||||||
|
|
||||||
Ok(json!({ "swaps": vec }))
|
Ok(json!({ "swaps": vec }))
|
||||||
}
|
}
|
||||||
Method::RawHistory => {
|
Method::RawHistory => {
|
||||||
let raw_history = context.db.raw_all().await?;
|
let raw_history = context.db.raw_all().await?;
|
||||||
|
|
||||||
Ok(json!({ "raw_history": raw_history }))
|
Ok(json!({ "raw_history": raw_history }))
|
||||||
}
|
}
|
||||||
Method::GetSeller => {
|
Method::GetSeller => {
|
||||||
|
@ -341,9 +318,11 @@ impl Request {
|
||||||
rpc::run_server(server_address, Arc::clone(&context)).await?;
|
rpc::run_server(server_address, Arc::clone(&context)).await?;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
let shutdown = Arc::clone(&context.shutdown);
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
_ = self.shutdown.recv() => {
|
_ = shutdown.recv() => {
|
||||||
server_handle.stop()?;
|
server_handle.stop()?;
|
||||||
|
context.shutdown.notify.send(())?;
|
||||||
return Ok(json!({
|
return Ok(json!({
|
||||||
"result": []
|
"result": []
|
||||||
}))
|
}))
|
||||||
|
|
|
@ -22,7 +22,7 @@ use tokio::sync::broadcast;
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<()> {
|
async fn main() -> Result<()> {
|
||||||
let (tx, _) = broadcast::channel(1);
|
let (tx, _) = broadcast::channel(1);
|
||||||
let (context, mut request) = match parse_args_and_apply_defaults(env::args_os(), tx).await? {
|
let (context, mut request) = match parse_args_and_apply_defaults(env::args_os(), tx.clone()).await? {
|
||||||
ParseResult::Context(context, request) => (context, request),
|
ParseResult::Context(context, request) => (context, request),
|
||||||
ParseResult::PrintAndExitZero { message } => {
|
ParseResult::PrintAndExitZero { message } => {
|
||||||
println!("{}", message);
|
println!("{}", message);
|
||||||
|
@ -34,6 +34,7 @@ async fn main() -> Result<()> {
|
||||||
eprintln!("{}", e);
|
eprintln!("{}", e);
|
||||||
}
|
}
|
||||||
let _result = request.call(Arc::clone(&context)).await?;
|
let _result = request.call(Arc::clone(&context)).await?;
|
||||||
|
tx.send(())?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -79,7 +79,6 @@ where
|
||||||
bitcoin_address::validate(bitcoin_change_address, is_testnet)?;
|
bitcoin_address::validate(bitcoin_change_address, is_testnet)?;
|
||||||
|
|
||||||
let request = Request::new(
|
let request = Request::new(
|
||||||
rx.subscribe(),
|
|
||||||
Method::BuyXmr,
|
Method::BuyXmr,
|
||||||
Params {
|
Params {
|
||||||
bitcoin_change_address: Some(bitcoin_change_address),
|
bitcoin_change_address: Some(bitcoin_change_address),
|
||||||
|
@ -104,21 +103,21 @@ where
|
||||||
(context, request)
|
(context, request)
|
||||||
}
|
}
|
||||||
CliCommand::History => {
|
CliCommand::History => {
|
||||||
let request = Request::new(rx.subscribe(), Method::History, Params::default());
|
let request = Request::new(Method::History, Params::default());
|
||||||
|
|
||||||
let context =
|
let context =
|
||||||
Context::build(None, None, None, data, is_testnet, debug, json, None, rx).await?;
|
Context::build(None, None, None, data, is_testnet, debug, json, None, rx).await?;
|
||||||
(context, request)
|
(context, request)
|
||||||
}
|
}
|
||||||
CliCommand::Config => {
|
CliCommand::Config => {
|
||||||
let request = Request::new(rx.subscribe(), Method::Config, Params::default());
|
let request = Request::new(Method::Config, Params::default());
|
||||||
|
|
||||||
let context =
|
let context =
|
||||||
Context::build(None, None, None, data, is_testnet, debug, json, None, rx).await?;
|
Context::build(None, None, None, data, is_testnet, debug, json, None, rx).await?;
|
||||||
(context, request)
|
(context, request)
|
||||||
}
|
}
|
||||||
CliCommand::Balance { bitcoin } => {
|
CliCommand::Balance { bitcoin } => {
|
||||||
let request = Request::new(rx.subscribe(), Method::Balance, Params::default());
|
let request = Request::new(Method::Balance, Params::default());
|
||||||
|
|
||||||
let context = Context::build(
|
let context = Context::build(
|
||||||
Some(bitcoin),
|
Some(bitcoin),
|
||||||
|
@ -141,7 +140,6 @@ where
|
||||||
tor,
|
tor,
|
||||||
} => {
|
} => {
|
||||||
let request = Request::new(
|
let request = Request::new(
|
||||||
rx.subscribe(),
|
|
||||||
Method::StartDaemon,
|
Method::StartDaemon,
|
||||||
Params {
|
Params {
|
||||||
server_address,
|
server_address,
|
||||||
|
@ -171,7 +169,6 @@ where
|
||||||
let address = bitcoin_address::validate(address, is_testnet)?;
|
let address = bitcoin_address::validate(address, is_testnet)?;
|
||||||
|
|
||||||
let request = Request::new(
|
let request = Request::new(
|
||||||
rx.subscribe(),
|
|
||||||
Method::WithdrawBtc,
|
Method::WithdrawBtc,
|
||||||
Params {
|
Params {
|
||||||
amount,
|
amount,
|
||||||
|
@ -201,7 +198,6 @@ where
|
||||||
tor,
|
tor,
|
||||||
} => {
|
} => {
|
||||||
let request = Request::new(
|
let request = Request::new(
|
||||||
rx.subscribe(),
|
|
||||||
Method::Resume,
|
Method::Resume,
|
||||||
Params {
|
Params {
|
||||||
swap_id: Some(swap_id),
|
swap_id: Some(swap_id),
|
||||||
|
@ -229,7 +225,6 @@ where
|
||||||
tor,
|
tor,
|
||||||
} => {
|
} => {
|
||||||
let request = Request::new(
|
let request = Request::new(
|
||||||
rx.subscribe(),
|
|
||||||
Method::CancelAndRefund,
|
Method::CancelAndRefund,
|
||||||
Params {
|
Params {
|
||||||
swap_id: Some(swap_id),
|
swap_id: Some(swap_id),
|
||||||
|
@ -256,7 +251,6 @@ where
|
||||||
tor,
|
tor,
|
||||||
} => {
|
} => {
|
||||||
let request = Request::new(
|
let request = Request::new(
|
||||||
rx.subscribe(),
|
|
||||||
Method::ListSellers,
|
Method::ListSellers,
|
||||||
Params {
|
Params {
|
||||||
rendezvous_point: Some(rendezvous_point),
|
rendezvous_point: Some(rendezvous_point),
|
||||||
|
@ -281,7 +275,6 @@ where
|
||||||
}
|
}
|
||||||
CliCommand::ExportBitcoinWallet { bitcoin } => {
|
CliCommand::ExportBitcoinWallet { bitcoin } => {
|
||||||
let request = Request::new(
|
let request = Request::new(
|
||||||
rx.subscribe(),
|
|
||||||
Method::ExportBitcoinWallet,
|
Method::ExportBitcoinWallet,
|
||||||
Params::default(),
|
Params::default(),
|
||||||
);
|
);
|
||||||
|
@ -304,7 +297,6 @@ where
|
||||||
swap_id: SwapId { swap_id },
|
swap_id: SwapId { swap_id },
|
||||||
} => {
|
} => {
|
||||||
let request = Request::new(
|
let request = Request::new(
|
||||||
rx.subscribe(),
|
|
||||||
Method::MoneroRecovery,
|
Method::MoneroRecovery,
|
||||||
Params {
|
Params {
|
||||||
swap_id: Some(swap_id),
|
swap_id: Some(swap_id),
|
||||||
|
|
|
@ -164,7 +164,7 @@ async fn execute_request(
|
||||||
params: Params,
|
params: Params,
|
||||||
context: &Arc<Context>,
|
context: &Arc<Context>,
|
||||||
) -> Result<serde_json::Value, jsonrpsee_core::Error> {
|
) -> Result<serde_json::Value, jsonrpsee_core::Error> {
|
||||||
let mut request = Request::new(context.shutdown.subscribe(), cmd, params);
|
let mut request = Request::new(cmd, params);
|
||||||
request
|
request
|
||||||
.call(Arc::clone(context))
|
.call(Arc::clone(context))
|
||||||
.await
|
.await
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue