diff --git a/swap/src/api.rs b/swap/src/api.rs index 8f3db7bb..10aba4b6 100644 --- a/swap/src/api.rs +++ b/swap/src/api.rs @@ -12,7 +12,7 @@ use std::fmt; use std::net::SocketAddr; use std::path::PathBuf; use std::sync::{Arc, Once}; -use tokio::sync::{broadcast, Mutex}; +use tokio::sync::{broadcast, Mutex, broadcast::Receiver, broadcast::Sender}; use url::Url; static START: Once = Once::new(); @@ -30,6 +30,43 @@ pub struct Config { pub is_testnet: bool, } +impl Shutdown { + pub fn new(listen: Receiver<()>) -> Shutdown { + let (notify, _) = broadcast::channel(16); + Shutdown { + shutdown: Mutex::new(false), + listen: Mutex::new(listen), + notify + } + } + + /// Receive the shutdown notice, waiting if necessary. + pub async fn recv(&self) { + // If the shutdown signal has already been received, then return + // immediately. + let mut guard_shutdown = self.shutdown.lock().await; + if *guard_shutdown { + return; + } + + let _ = self.listen.lock().await.recv().await; + + // Remember that the signal has been received. + *guard_shutdown = true; + + // Send shutdown request to child tasks + + } +} + +#[derive(Debug)] +pub struct Shutdown { + shutdown: Mutex, + listen: Mutex>, + notify: Sender<()>, +} + + // workaround for warning over monero_rpc_process which we must own but not read #[allow(dead_code)] pub struct Context { @@ -37,9 +74,8 @@ pub struct Context { bitcoin_wallet: Option>, monero_wallet: Option>, monero_rpc_process: Option, - running_swap: Arc>, pub config: Config, - pub shutdown: Arc>, + pub shutdown: Arc, } impl Context { @@ -52,7 +88,7 @@ impl Context { debug: bool, json: bool, server_address: Option, - shutdown: broadcast::Sender<()>, + sender: broadcast::Sender<()>, ) -> Result { let data_dir = data::data_dir_from(data, is_testnet)?; let env_config = env_config_from(is_testnet); @@ -112,8 +148,7 @@ impl Context { is_testnet, data_dir, }, - shutdown: Arc::new(shutdown), - running_swap: Arc::new(Mutex::new(false)), + shutdown: Arc::new(Shutdown::new(sender.subscribe())), }; Ok(context) @@ -265,7 +300,6 @@ pub mod api_test { }; Request::new( - tx.subscribe(), Method::BuyXmr, Params { seller: Some(seller), @@ -277,9 +311,8 @@ pub mod api_test { ) } - pub fn resume(tx: broadcast::Sender<()>) -> Request { + pub fn resume() -> Request { Request::new( - tx.subscribe(), Method::Resume, Params { swap_id: Some(Uuid::from_str(SWAP_ID).unwrap()), @@ -288,9 +321,8 @@ pub mod api_test { ) } - pub fn cancel(tx: broadcast::Sender<()>) -> Request { + pub fn cancel() -> Request { Request::new( - tx.subscribe(), Method::CancelAndRefund, Params { swap_id: Some(Uuid::from_str(SWAP_ID).unwrap()), @@ -299,9 +331,8 @@ pub mod api_test { ) } - pub fn refund(tx: broadcast::Sender<()>) -> Request { + pub fn refund() -> Request { Request::new( - tx.subscribe(), Method::CancelAndRefund, Params { swap_id: Some(Uuid::from_str(SWAP_ID).unwrap()), diff --git a/swap/src/api/request.rs b/swap/src/api/request.rs index e4360c7a..9d8167f4 100644 --- a/swap/src/api/request.rs +++ b/swap/src/api/request.rs @@ -18,59 +18,15 @@ use std::future::Future; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; -use tokio::sync::broadcast::Receiver; -use tokio::sync::Mutex; use tracing::{debug_span, Instrument}; use uuid::Uuid; #[derive(PartialEq, Debug)] pub struct Request { pub params: Params, - pub cmd: Method, - pub shutdown: Shutdown, + pub cmd: Method } -impl Shutdown { - pub fn new(notify: Receiver<()>) -> Shutdown { - Shutdown { - shutdown: false, - notify, - } - } - - /// Returns `true` if the shutdown signal has been received. - pub fn is_shutdown(&self) -> bool { - self.shutdown - } - - /// Receive the shutdown notice, waiting if necessary. - pub async fn recv(&mut self) { - // If the shutdown signal has already been received, then return - // immediately. - if self.shutdown { - return; - } - - // Cannot receive a "lag error" as only one value is ever sent. - let _ = self.notify.recv().await; - - self.shutdown = true; - - // Remember that the signal has been received. - } -} - -#[derive(Debug)] -pub struct Shutdown { - shutdown: bool, - notify: Receiver<()>, -} - -impl PartialEq for Shutdown { - fn eq(&self, other: &Shutdown) -> bool { - self.shutdown == other.shutdown - } -} #[derive(Default, PartialEq, Debug)] pub struct Params { @@ -103,11 +59,10 @@ pub enum Method { } impl Request { - pub fn new(shutdownReceiver: Receiver<()>, cmd: Method, params: Params) -> Request { + pub fn new(cmd: Method, params: Params) -> Request { Request { params, - cmd, - shutdown: Shutdown::new(shutdownReceiver), + cmd } } @@ -218,17 +173,37 @@ impl Request { bitcoin_change_address, amount, ); + let mut halt = context.shutdown.notify.subscribe(); - tokio::select! { - result = event_loop => { - result - .context("EventLoop panicked")?; - }, - result = bob::run(swap) => { - result - .context("Failed to complete swap")?; + // execution will halt if the server daemon is stopped or a cancel running swap + // request is sent + tokio::spawn(async move { + tokio::select! { + result = event_loop => { + match result { + Ok(_) => { + tracing::debug!(%swap_id, "EventLoop completed") + } + Err(error) => { + tracing::error!(%swap_id, "EventLoop failed: {:#}", error) + } + } + }, + result = bob::run(swap) => { + match result { + Ok(state) => { + tracing::debug!(%swap_id, state=%state, "Swap completed") + } + Err(error) => { + tracing::error!(%swap_id, "Failed to complete swap: {:#}", error) + } + } + } + _ = halt.recv() => { + tracing::debug!(%swap_id, "Swap cancel signal received while running swap") + } } - } + }); Ok(json!({ "empty": "true" })) @@ -240,11 +215,13 @@ impl Request { let state: BobState = state.try_into()?; vec.push((swap_id, state.to_string())); } + context.shutdown.notify.send(())?; Ok(json!({ "swaps": vec })) } Method::RawHistory => { let raw_history = context.db.raw_all().await?; + Ok(json!({ "raw_history": raw_history })) } Method::GetSeller => { @@ -341,9 +318,11 @@ impl Request { rpc::run_server(server_address, Arc::clone(&context)).await?; loop { + let shutdown = Arc::clone(&context.shutdown); tokio::select! { - _ = self.shutdown.recv() => { + _ = shutdown.recv() => { server_handle.stop()?; + context.shutdown.notify.send(())?; return Ok(json!({ "result": [] })) diff --git a/swap/src/bin/swap.rs b/swap/src/bin/swap.rs index e86cf75a..54a1447f 100644 --- a/swap/src/bin/swap.rs +++ b/swap/src/bin/swap.rs @@ -22,7 +22,7 @@ use tokio::sync::broadcast; #[tokio::main] async fn main() -> Result<()> { let (tx, _) = broadcast::channel(1); - let (context, mut request) = match parse_args_and_apply_defaults(env::args_os(), tx).await? { + let (context, mut request) = match parse_args_and_apply_defaults(env::args_os(), tx.clone()).await? { ParseResult::Context(context, request) => (context, request), ParseResult::PrintAndExitZero { message } => { println!("{}", message); @@ -34,6 +34,7 @@ async fn main() -> Result<()> { eprintln!("{}", e); } let _result = request.call(Arc::clone(&context)).await?; + tx.send(())?; Ok(()) } diff --git a/swap/src/cli/command.rs b/swap/src/cli/command.rs index e3c664b1..7a58e99e 100644 --- a/swap/src/cli/command.rs +++ b/swap/src/cli/command.rs @@ -79,7 +79,6 @@ where bitcoin_address::validate(bitcoin_change_address, is_testnet)?; let request = Request::new( - rx.subscribe(), Method::BuyXmr, Params { bitcoin_change_address: Some(bitcoin_change_address), @@ -104,21 +103,21 @@ where (context, request) } CliCommand::History => { - let request = Request::new(rx.subscribe(), Method::History, Params::default()); + let request = Request::new(Method::History, Params::default()); let context = Context::build(None, None, None, data, is_testnet, debug, json, None, rx).await?; (context, request) } CliCommand::Config => { - let request = Request::new(rx.subscribe(), Method::Config, Params::default()); + let request = Request::new(Method::Config, Params::default()); let context = Context::build(None, None, None, data, is_testnet, debug, json, None, rx).await?; (context, request) } CliCommand::Balance { bitcoin } => { - let request = Request::new(rx.subscribe(), Method::Balance, Params::default()); + let request = Request::new(Method::Balance, Params::default()); let context = Context::build( Some(bitcoin), @@ -141,7 +140,6 @@ where tor, } => { let request = Request::new( - rx.subscribe(), Method::StartDaemon, Params { server_address, @@ -171,7 +169,6 @@ where let address = bitcoin_address::validate(address, is_testnet)?; let request = Request::new( - rx.subscribe(), Method::WithdrawBtc, Params { amount, @@ -201,7 +198,6 @@ where tor, } => { let request = Request::new( - rx.subscribe(), Method::Resume, Params { swap_id: Some(swap_id), @@ -229,7 +225,6 @@ where tor, } => { let request = Request::new( - rx.subscribe(), Method::CancelAndRefund, Params { swap_id: Some(swap_id), @@ -256,7 +251,6 @@ where tor, } => { let request = Request::new( - rx.subscribe(), Method::ListSellers, Params { rendezvous_point: Some(rendezvous_point), @@ -281,7 +275,6 @@ where } CliCommand::ExportBitcoinWallet { bitcoin } => { let request = Request::new( - rx.subscribe(), Method::ExportBitcoinWallet, Params::default(), ); @@ -304,7 +297,6 @@ where swap_id: SwapId { swap_id }, } => { let request = Request::new( - rx.subscribe(), Method::MoneroRecovery, Params { swap_id: Some(swap_id), diff --git a/swap/src/rpc/methods.rs b/swap/src/rpc/methods.rs index dbdb0479..3b66de11 100644 --- a/swap/src/rpc/methods.rs +++ b/swap/src/rpc/methods.rs @@ -164,7 +164,7 @@ async fn execute_request( params: Params, context: &Arc, ) -> Result { - let mut request = Request::new(context.shutdown.subscribe(), cmd, params); + let mut request = Request::new(cmd, params); request .call(Arc::clone(context)) .await