From cf5efa8ad0562e07c7ea3d164ffe6dd1b90b54a6 Mon Sep 17 00:00:00 2001 From: Lorenzo Tucci Date: Fri, 9 Dec 2022 22:33:52 +0100 Subject: [PATCH] add broadcast channel to handle shutdowns gracefully and prepare for RPC server test --- Cargo.lock | 143 +++++++++++++------ swap/Cargo.toml | 5 +- swap/src/api.rs | 25 +++- swap/src/api/request.rs | 71 ++++++++-- swap/src/bin/swap.rs | 6 +- swap/src/cli/command.rs | 294 ++++++++++++++++++++++++---------------- swap/src/rpc.rs | 6 +- swap/src/rpc/methods.rs | 31 +++-- 8 files changed, 382 insertions(+), 199 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2262f425..491fe8b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1658,9 +1658,12 @@ checksum = "d87c48c02e0dc5e3b849a2041db3029fd066650f8f717c07bf8ed78ccb895cac" dependencies = [ "http", "hyper", + "log", "rustls 0.20.2", + "rustls-native-certs 0.6.2", "tokio", "tokio-rustls 0.23.1", + "webpki-roots 0.22.2", ] [[package]] @@ -1810,14 +1813,13 @@ dependencies = [ [[package]] name = "jsonrpsee" -version = "0.15.1" +version = "0.16.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8bd0d559d5e679b1ab2f869b486a11182923863b1b3ee8b421763cdd707b783a" +checksum = "7d291e3a5818a2384645fd9756362e6d89cf0541b0b916fa7702ea4a9833608e" dependencies = [ - "jsonrpsee-core", - "jsonrpsee-http-server", - "jsonrpsee-types", - "jsonrpsee-ws-server", + "jsonrpsee-core 0.16.2", + "jsonrpsee-server", + "jsonrpsee-types 0.16.2", ] [[package]] @@ -1825,6 +1827,25 @@ name = "jsonrpsee-core" version = "0.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f3dc3e9cf2ba50b7b1d7d76a667619f82846caa39e8e8daa8a4962d74acaddca" +dependencies = [ + "anyhow", + "async-trait", + "beef", + "futures-channel", + "futures-util", + "hyper", + "jsonrpsee-types 0.15.1", + "serde", + "serde_json", + "thiserror", + "tracing", +] + +[[package]] +name = "jsonrpsee-core" +version = "0.16.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4e70b4439a751a5de7dd5ed55eacff78ebf4ffe0fc009cb1ebb11417f5b536b" dependencies = [ "anyhow", "arrayvec 0.7.2", @@ -1833,10 +1854,8 @@ dependencies = [ "futures-channel", "futures-util", "globset", - "http", "hyper", - "jsonrpsee-types", - "lazy_static", + "jsonrpsee-types 0.16.2", "parking_lot 0.12.0", "rand 0.8.3", "rustc-hash", @@ -1846,27 +1865,50 @@ dependencies = [ "thiserror", "tokio", "tracing", - "unicase", ] [[package]] -name = "jsonrpsee-http-server" +name = "jsonrpsee-http-client" version = "0.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03802f0373a38c2420c70b5144742d800b509e2937edc4afb116434f07120117" +checksum = "52f7c0e2333ab2115c302eeb4f137c8a4af5ab609762df68bbda8f06496677c9" dependencies = [ - "futures-channel", - "futures-util", + "async-trait", "hyper", - "jsonrpsee-core", - "jsonrpsee-types", + "hyper-rustls", + "jsonrpsee-core 0.15.1", + "jsonrpsee-types 0.15.1", + "rustc-hash", "serde", "serde_json", + "thiserror", "tokio", "tracing", "tracing-futures", ] +[[package]] +name = "jsonrpsee-server" +version = "0.16.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fb69dad85df79527c019659a992498d03f8495390496da2f07e6c24c2b356fc" +dependencies = [ + "futures-channel", + "futures-util", + "http", + "hyper", + "jsonrpsee-core 0.16.2", + "jsonrpsee-types 0.16.2", + "serde", + "serde_json", + "soketto", + "tokio", + "tokio-stream", + "tokio-util 0.7.3", + "tower", + "tracing", +] + [[package]] name = "jsonrpsee-types" version = "0.15.1" @@ -1882,23 +1924,17 @@ dependencies = [ ] [[package]] -name = "jsonrpsee-ws-server" -version = "0.15.1" +name = "jsonrpsee-types" +version = "0.16.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d488ba74fb369e5ab68926feb75a483458b88e768d44319f37e4ecad283c7325" +checksum = "5bd522fe1ce3702fd94812965d7bb7a3364b1c9aba743944c5a00529aae80f8c" dependencies = [ - "futures-channel", - "futures-util", - "http", - "jsonrpsee-core", - "jsonrpsee-types", + "anyhow", + "beef", + "serde", "serde_json", - "soketto", - "tokio", - "tokio-stream", - "tokio-util 0.7.3", + "thiserror", "tracing", - "tracing-futures", ] [[package]] @@ -2277,9 +2313,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.14" +version = "0.4.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710" +checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" dependencies = [ "cfg-if 1.0.0", ] @@ -3464,6 +3500,18 @@ dependencies = [ "security-framework", ] +[[package]] +name = "rustls-native-certs" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0167bac7a9f490495f3c33013e7722b53cb087ecbe082fb0c6387c96f634ea50" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "schannel", + "security-framework", +] + [[package]] name = "rustls-pemfile" version = "1.0.0" @@ -3908,6 +3956,7 @@ dependencies = [ "bytes", "flate2", "futures", + "http", "httparse", "log", "rand 0.8.3", @@ -4161,7 +4210,8 @@ dependencies = [ "hyper", "itertools", "jsonrpsee", - "jsonrpsee-core", + "jsonrpsee-core 0.16.2", + "jsonrpsee-http-client", "libp2p", "monero", "monero-harness", @@ -4550,6 +4600,23 @@ dependencies = [ "tokio", ] +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" + [[package]] name = "tower-service" version = "0.3.1" @@ -4563,6 +4630,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" dependencies = [ "cfg-if 1.0.0", + "log", "pin-project-lite 0.2.9", "tracing-attributes", "tracing-core", @@ -4737,7 +4805,7 @@ dependencies = [ "log", "rand 0.8.3", "rustls 0.19.0", - "rustls-native-certs", + "rustls-native-certs 0.5.0", "sha-1", "thiserror", "url", @@ -4769,15 +4837,6 @@ dependencies = [ "static_assertions", ] -[[package]] -name = "unicase" -version = "2.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6" -dependencies = [ - "version_check", -] - [[package]] name = "unicode-bidi" version = "0.3.4" diff --git a/swap/Cargo.toml b/swap/Cargo.toml index d6b0245b..94938cdd 100644 --- a/swap/Cargo.toml +++ b/swap/Cargo.toml @@ -31,8 +31,8 @@ ed25519-dalek = "1" futures = { version = "0.3", default-features = false } hex = "0.4" itertools = "0.10" -jsonrpsee = { version = "0.15.1", features = [ "server" ] } -jsonrpsee-core = "0.15.1" +jsonrpsee = { version = "0.16.2", features = [ "server" ] } +jsonrpsee-core = "0.16.2" libp2p = { version = "0.42.2", default-features = false, features = [ "tcp-tokio", "yamux", "mplex", "dns-tokio", "noise", "request-response", "websocket", "ping", "rendezvous", "identify" ] } monero = { version = "0.12", features = [ "serde_support" ] } monero-rpc = { path = "../monero-rpc" } @@ -81,6 +81,7 @@ get-port = "3" hyper = "0.14" monero-harness = { path = "../monero-harness" } port_check = "0.1" +jsonrpsee-http-client = "0.15.1" proptest = "1" serde_cbor = "0.11" spectral = "0.6" diff --git a/swap/src/api.rs b/swap/src/api.rs index 7347f500..3643014a 100644 --- a/swap/src/api.rs +++ b/swap/src/api.rs @@ -11,9 +11,10 @@ use anyhow::{Context as AnyContext, Result}; use std::fmt; use std::net::SocketAddr; use std::path::PathBuf; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use url::Url; use std::sync::Once; +use tokio::sync::broadcast; static START: Once = Once::new(); @@ -35,6 +36,7 @@ pub struct Context { monero_wallet: Option>, monero_rpc_process: Option, pub config: Config, + pub shutdown: Arc>, } @@ -48,6 +50,7 @@ impl Context { debug: bool, json: bool, server_address: Option, + shutdown: broadcast::Sender<()>, ) -> Result { let data_dir = data::data_dir_from(data, is_testnet)?; let env_config = env_config_from(is_testnet); @@ -113,6 +116,7 @@ impl Context { json, is_testnet, }, + shutdown: Arc::new(shutdown), }; Ok(init) @@ -126,6 +130,7 @@ impl fmt::Debug for Context { } } + async fn init_bitcoin_wallet( electrum_rpc_url: Url, seed: &Seed, @@ -207,6 +212,10 @@ pub mod api_test { use super::*; use crate::tor::DEFAULT_SOCKS5_PORT; use std::str::FromStr; + use uuid::Uuid; + use crate::api::request::{Request, Params, Method, Shutdown}; + use libp2p::Multiaddr; + use tokio::sync::broadcast; pub const MULTI_ADDRESS: &str = "/ip4/127.0.0.1/tcp/9939/p2p/12D3KooWCdMKjesXMJz1SiZ7HgotrxuqhQJbP5sgBm2BwP1cqThi"; @@ -236,13 +245,13 @@ pub mod api_test { seed: Some(seed), debug, json, - is_testnet + is_testnet, } } } impl Request { - pub fn buy_xmr(is_testnet: bool) -> Request { + pub fn buy_xmr(is_testnet: bool, tx: broadcast::Sender<()>) -> Request { let seller = Multiaddr::from_str(MULTI_ADDRESS).unwrap(); let bitcoin_change_address = { if is_testnet { @@ -268,36 +277,40 @@ pub mod api_test { ..Default::default() }, cmd: Method::BuyXmr, + shutdown: Shutdown::new(tx.subscribe()), } } - pub fn resume() -> Request { + pub fn resume(tx: broadcast::Sender<()>) -> Request { Request { params: Params { swap_id: Some(Uuid::from_str(SWAP_ID).unwrap()), ..Default::default() }, cmd: Method::Resume, + shutdown: Shutdown::new(tx.subscribe()), } } - pub fn cancel() -> Request { + pub fn cancel(tx: broadcast::Sender<()>) -> Request { Request { params: Params { swap_id: Some(Uuid::from_str(SWAP_ID).unwrap()), ..Default::default() }, cmd: Method::Cancel, + shutdown: Shutdown::new(tx.subscribe()), } } - pub fn refund() -> Request { + pub fn refund(tx: broadcast::Sender<()>) -> Request { Request { params: Params { swap_id: Some(Uuid::from_str(SWAP_ID).unwrap()), ..Default::default() }, cmd: Method::Refund, + shutdown: Shutdown::new(tx.subscribe()), } } } diff --git a/swap/src/api/request.rs b/swap/src/api/request.rs index 9bfe1504..ac7daea1 100644 --- a/swap/src/api/request.rs +++ b/swap/src/api/request.rs @@ -18,12 +18,56 @@ use std::sync::Arc; use std::time::Duration; use uuid::Uuid; use crate::api::Context; +use tokio::sync::broadcast; #[derive(PartialEq, Debug)] pub struct Request { pub params: Params, pub cmd: Method, + pub shutdown: Shutdown, +} + +impl Shutdown { + pub fn new(notify: broadcast::Receiver<()>) -> Shutdown { + Shutdown { + shutdown: false, + notify, + } + } + + /// Returns `true` if the shutdown signal has been received. + pub fn is_shutdown(&self) -> bool { + self.shutdown + } + + /// Receive the shutdown notice, waiting if necessary. + pub async fn recv(&mut self) { + // If the shutdown signal has already been received, then return + // immediately. + if self.shutdown { + return; + } + + // Cannot receive a "lag error" as only one value is ever sent. + let _ = self.notify.recv().await; + + self.shutdown = true; + + // Remember that the signal has been received. + } +} + +#[derive(Debug)] +pub struct Shutdown { + shutdown: bool, + notify: broadcast::Receiver<()> +} + +impl PartialEq for Shutdown { + fn eq(&self, other: &Shutdown) -> bool { + self.shutdown == other.shutdown + } } #[derive(Default, PartialEq, Debug)] @@ -37,6 +81,7 @@ pub struct Params { pub address: Option, } + #[derive(Debug, PartialEq)] pub enum Method { BuyXmr, @@ -57,7 +102,7 @@ pub enum Method { } impl Request { - pub async fn call(&self, context: Arc) -> Result { + pub async fn call(&mut self, context: Arc) -> Result { let result = match self.cmd { Method::BuyXmr => { let swap_id = Uuid::new_v4(); @@ -247,19 +292,18 @@ impl Request { Method::StartDaemon => { let addr2 = "127.0.0.1:1234".parse()?; - let server_handle = { - if let Some(addr) = context.config.server_address { - let (_addr, handle) = rpc::run_server(addr, context).await?; - Some(handle) - } else { - let (_addr, handle) = rpc::run_server(addr2, context).await?; - Some(handle) + let (_, server_handle) = rpc::run_server(addr2, Arc::clone(&context)).await?; + + loop { + tokio::select! { + _ = self.shutdown.recv() => { + server_handle.stop(); + return Ok(json!({ + "result": [] + })) + } } - }; - loop {} - json!({ - "result": [] - }) + } } Method::Balance => { let bitcoin_wallet = context.bitcoin_wallet.as_ref().unwrap(); @@ -458,6 +502,7 @@ impl Request { } } + fn qr_code(value: &impl ToString) -> Result { let code = QrCode::new(value.to_string())?; let qr_code = code diff --git a/swap/src/bin/swap.rs b/swap/src/bin/swap.rs index 8be7cbae..da8954a1 100644 --- a/swap/src/bin/swap.rs +++ b/swap/src/bin/swap.rs @@ -17,10 +17,12 @@ use std::env; use std::sync::Arc; use swap::cli::command::{parse_args_and_apply_defaults, ParseResult}; use swap::common::check_latest_version; +use tokio::sync::broadcast; #[tokio::main] async fn main() -> Result<()> { - let (context, request) = match parse_args_and_apply_defaults(env::args_os()).await? { + let (tx, mut rx1) = broadcast::channel(1); + let (context, mut request) = match parse_args_and_apply_defaults(env::args_os(), tx).await? { ParseResult::Context(context, request) => (context, request), ParseResult::PrintAndExitZero { message } => { println!("{}", message); @@ -42,7 +44,7 @@ mod tests { use ::bitcoin::Amount; use std::sync::Mutex; use std::time::Duration; - use swap::api::determine_btc_to_swap; + use swap::api::request::determine_btc_to_swap; use swap::network::quote::BidQuote; use swap::tracing_ext::capture_logs; use tracing::level_filters::LevelFilter; diff --git a/swap/src/cli/command.rs b/swap/src/cli/command.rs index 42848820..00c7f937 100644 --- a/swap/src/cli/command.rs +++ b/swap/src/cli/command.rs @@ -1,5 +1,5 @@ use crate::api::Context; -use crate::api::request::{Request, Params, Method}; +use crate::api::request::{Request, Params, Method, Shutdown}; use crate::bitcoin::{Amount, bitcoin_address}; use crate::monero::monero_address; use crate::monero; @@ -13,6 +13,7 @@ use std::sync::Arc; use structopt::{clap, StructOpt}; use url::Url; use uuid::Uuid; +use tokio::sync::broadcast; // See: https://moneroworld.com/ pub const DEFAULT_MONERO_DAEMON_ADDRESS: &str = "node.community.rino.io:18081"; @@ -41,7 +42,7 @@ pub enum ParseResult { PrintAndExitZero { message: String }, } -pub async fn parse_args_and_apply_defaults(raw_args: I) -> Result +pub async fn parse_args_and_apply_defaults(raw_args: I, rx: broadcast::Sender<()>) -> Result where I: IntoIterator, T: Into + Clone, @@ -60,7 +61,6 @@ where let json = args.json; let is_testnet = args.testnet; let data = args.data; - let (context, request) = match args.cmd { CliCommand::BuyXmr { seller: Seller { seller }, @@ -70,18 +70,6 @@ where monero_receive_address, tor, } => { - let context = Context::build( - Some(bitcoin), - Some(monero), - Some(tor), - data, - is_testnet, - debug, - json, - None, - ) - .await?; - let monero_receive_address = monero_address::validate(monero_receive_address, is_testnet)?; let bitcoin_change_address = bitcoin_address::validate(bitcoin_change_address, is_testnet)?; @@ -93,30 +81,52 @@ where ..Default::default() }, cmd: Method::BuyXmr, + shutdown: Shutdown::new(rx.subscribe()), }; + + let context = Context::build( + Some(bitcoin), + Some(monero), + Some(tor), + data, + is_testnet, + debug, + json, + None, + rx, + ) + .await?; (context, request) } CliCommand::History => { - let context = - Context::build(None, None, None, data, is_testnet, debug, json, None).await?; - let request = Request { params: Params::default(), cmd: Method::History, + shutdown: Shutdown::new(rx.subscribe()), }; + + let context = + Context::build(None, None, None, data, is_testnet, debug, json, None, rx).await?; (context, request) } CliCommand::Config => { - let context = - Context::build(None, None, None, data, is_testnet, debug, json, None).await?; - let request = Request { params: Params::default(), cmd: Method::Config, + shutdown: Shutdown::new(rx.subscribe()), }; + + let context = + Context::build(None, None, None, data, is_testnet, debug, json, None, rx).await?; (context, request) } CliCommand::Balance { bitcoin } => { + let request = Request { + params: Params::default(), + cmd: Method::Balance, + shutdown: Shutdown::new(rx.subscribe()), + }; + let context = Context::build( Some(bitcoin), None, @@ -126,12 +136,9 @@ where debug, json, None, + rx, ) .await?; - let request = Request { - params: Params::default(), - cmd: Method::Balance, - }; (context, request) } CliCommand::StartDaemon { @@ -140,6 +147,12 @@ where monero, tor, } => { + let request = Request { + params: Params::default(), + cmd: Method::StartDaemon, + shutdown: Shutdown::new(rx.subscribe()), + }; + let context = Context::build( Some(bitcoin), Some(monero), @@ -149,12 +162,9 @@ where debug, json, server_address, + rx, ) .await?; - let request = Request { - params: Params::default(), - cmd: Method::StartDaemon, - }; (context, request) } CliCommand::WithdrawBtc { @@ -162,18 +172,6 @@ where amount, address, } => { - let context = Context::build( - Some(bitcoin), - None, - None, - data, - is_testnet, - debug, - json, - None, - ) - .await?; - let address = bitcoin_address::validate(address, is_testnet)?; let request = Request { @@ -183,7 +181,21 @@ where ..Default::default() }, cmd: Method::WithdrawBtc, + shutdown: Shutdown::new(rx.subscribe()), }; + + let context = Context::build( + Some(bitcoin), + None, + None, + data, + is_testnet, + debug, + json, + None, + rx, + ) + .await?; (context, request) } CliCommand::Resume { @@ -192,6 +204,15 @@ where monero, tor, } => { + let request = Request { + params: Params { + swap_id: Some(swap_id), + ..Default::default() + }, + cmd: Method::Resume, + shutdown: Shutdown::new(rx.subscribe()), + }; + let context = Context::build( Some(bitcoin), Some(monero), @@ -201,15 +222,9 @@ where debug, json, None, + rx, ) .await?; - let request = Request { - params: Params { - swap_id: Some(swap_id), - ..Default::default() - }, - cmd: Method::Resume, - }; (context, request) } CliCommand::Cancel { @@ -217,6 +232,15 @@ where bitcoin, tor, } => { + let request = Request { + params: Params { + swap_id: Some(swap_id), + ..Default::default() + }, + cmd: Method::Cancel, + shutdown: Shutdown::new(rx.subscribe()), + }; + let context = Context::build( Some(bitcoin), None, @@ -226,15 +250,9 @@ where debug, json, None, + rx, ) .await?; - let request = Request { - params: Params { - swap_id: Some(swap_id), - ..Default::default() - }, - cmd: Method::Cancel, - }; (context, request) } CliCommand::Refund { @@ -242,6 +260,15 @@ where bitcoin, tor, } => { + let request = Request { + params: Params { + swap_id: Some(swap_id), + ..Default::default() + }, + cmd: Method::Refund, + shutdown: Shutdown::new(rx.subscribe()), + }; + let context = Context::build( Some(bitcoin), None, @@ -251,34 +278,36 @@ where debug, json, None, + rx, ) .await?; - let request = Request { - params: Params { - swap_id: Some(swap_id), - ..Default::default() - }, - cmd: Method::Refund, - }; (context, request) } CliCommand::ListSellers { rendezvous_point, tor, } => { - let context = - Context::build(None, None, Some(tor), data, is_testnet, debug, json, None).await?; - let request = Request { params: Params { rendezvous_point: Some(rendezvous_point), ..Default::default() }, cmd: Method::ListSellers, + shutdown: Shutdown::new(rx.subscribe()), }; + + let context = + Context::build(None, None, Some(tor), data, is_testnet, debug, json, None, rx).await?; + (context, request) } CliCommand::ExportBitcoinWallet { bitcoin } => { + let request = Request { + params: Params::default(), + cmd: Method::ExportBitcoinWallet, + shutdown: Shutdown::new(rx.subscribe()), + }; + let context = Context::build( Some(bitcoin), None, @@ -288,25 +317,24 @@ where debug, json, None, + rx, ) .await?; - let request = Request { - params: Params::default(), - cmd: Method::ExportBitcoinWallet, - }; (context, request) } CliCommand::MoneroRecovery { swap_id } => { - let context = - Context::build(None, None, None, data, is_testnet, debug, json, None).await?; - let request = Request { params: Params { swap_id: Some(swap_id.swap_id), ..Default::default() }, cmd: Method::MoneroRecovery, + shutdown: Shutdown::new(rx.subscribe()), }; + + let context = + Context::build(None, None, None, data, is_testnet, debug, json, None, rx).await?; + (context, request) } }; @@ -577,6 +605,8 @@ mod tests { use crate::api::api_test::*; use sequential_test::sequential; use crate::monero::monero_address::MoneroAddressNetworkMismatch; + use crate::api::Config; + use crate::fs::system_data_dir; const BINARY_NAME: &str = "swap"; const ARGS_DATA_DIR: &str = "/tmp/dir/"; @@ -595,12 +625,13 @@ mod tests { MULTI_ADDRESS, ]; - let args = parse_args_and_apply_defaults(raw_ars).await.unwrap(); + let (tx, _) = broadcast::channel(1); + let args = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap(); let (is_testnet, debug, json) = (false, false, false); let (expected_config, expected_request) = ( Config::default(is_testnet, None, debug, json), - Request::buy_xmr(is_testnet), + Request::buy_xmr(is_testnet, tx.clone()), ); let (actual_config, actual_request) = match args { @@ -627,12 +658,13 @@ mod tests { MULTI_ADDRESS, ]; - let args = parse_args_and_apply_defaults(raw_ars).await.unwrap(); + let (tx, _) = broadcast::channel(1); + let args = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap(); let (is_testnet, debug, json) = (true, false, false); let (expected_config, expected_request) = ( Config::default(is_testnet, None, debug, json), - Request::buy_xmr(is_testnet), + Request::buy_xmr(is_testnet, tx.clone()), ); let (actual_config, actual_request) = match args { @@ -658,7 +690,8 @@ mod tests { MULTI_ADDRESS, ]; - let err = parse_args_and_apply_defaults(raw_ars).await.unwrap_err(); + let (tx, _) = broadcast::channel(1); + let err = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap_err(); assert_eq!( err.downcast_ref::().unwrap(), @@ -684,7 +717,8 @@ mod tests { MULTI_ADDRESS, ]; - let err = parse_args_and_apply_defaults(raw_ars).await.unwrap_err(); + let (tx, _) = broadcast::channel(1); + let err = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap_err(); assert_eq!( err.downcast_ref::().unwrap(), @@ -700,12 +734,13 @@ mod tests { async fn given_resume_on_mainnet_then_defaults_to_mainnet() { let raw_ars = vec![BINARY_NAME, "resume", "--swap-id", SWAP_ID]; - let args = parse_args_and_apply_defaults(raw_ars).await.unwrap(); + let (tx, _) = broadcast::channel(1); + let args = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap(); let (is_testnet, debug, json) = (false, false, false); let (expected_config, expected_request) = ( Config::default(is_testnet, None, debug, json), - Request::resume(), + Request::resume(tx.clone()), ); let (actual_config, actual_request) = match args { @@ -722,12 +757,13 @@ mod tests { async fn given_resume_on_testnet_then_defaults_to_testnet() { let raw_ars = vec![BINARY_NAME, "--testnet", "resume", "--swap-id", SWAP_ID]; - let args = parse_args_and_apply_defaults(raw_ars).await.unwrap(); + let (tx, _) = broadcast::channel(1); + let args = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap(); let (is_testnet, debug, json) = (true, false, false); let (expected_config, expected_request) = ( Config::default(is_testnet, None, debug, json), - Request::resume(), + Request::resume(tx.clone()), ); let (actual_config, actual_request) = match args { @@ -744,13 +780,14 @@ mod tests { async fn given_cancel_on_mainnet_then_defaults_to_mainnet() { let raw_ars = vec![BINARY_NAME, "cancel", "--swap-id", SWAP_ID]; - let args = parse_args_and_apply_defaults(raw_ars).await.unwrap(); + let (tx, _) = broadcast::channel(1); + let args = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap(); let (is_testnet, debug, json) = (false, false, false); let (expected_config, expected_request) = ( Config::default(is_testnet, None, debug, json), - Request::cancel(), + Request::cancel(tx.clone()), ); let (actual_config, actual_request) = match args { @@ -767,12 +804,13 @@ mod tests { async fn given_cancel_on_testnet_then_defaults_to_testnet() { let raw_ars = vec![BINARY_NAME, "--testnet", "cancel", "--swap-id", SWAP_ID]; - let args = parse_args_and_apply_defaults(raw_ars).await.unwrap(); + let (tx, _) = broadcast::channel(1); + let args = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap(); let (is_testnet, debug, json) = (true, false, false); let (expected_config, expected_request) = ( Config::default(is_testnet, None, debug, json), - Request::cancel(), + Request::cancel(tx.clone()), ); let (actual_config, actual_request) = match args { @@ -789,12 +827,13 @@ mod tests { async fn given_refund_on_mainnet_then_defaults_to_mainnet() { let raw_ars = vec![BINARY_NAME, "refund", "--swap-id", SWAP_ID]; - let args = parse_args_and_apply_defaults(raw_ars).await.unwrap(); + let (tx, _) = broadcast::channel(1); + let args = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap(); let (is_testnet, debug, json) = (false, false, false); let (expected_config, expected_request) = ( Config::default(is_testnet, None, debug, json), - Request::refund(), + Request::refund(tx.clone()), ); let (actual_config, actual_request) = match args { @@ -811,12 +850,13 @@ mod tests { async fn given_refund_on_testnet_then_defaults_to_testnet() { let raw_ars = vec![BINARY_NAME, "--testnet", "refund", "--swap-id", SWAP_ID]; - let args = parse_args_and_apply_defaults(raw_ars).await.unwrap(); + let (tx, _) = broadcast::channel(1); + let args = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap(); let (is_testnet, debug, json) = (true, false, false); let (expected_config, expected_request) = ( Config::default(is_testnet, None, debug, json), - Request::refund(), + Request::refund(tx.clone()), ); let (actual_config, actual_request) = match args { @@ -844,13 +884,14 @@ mod tests { MULTI_ADDRESS, ]; - let args = parse_args_and_apply_defaults(raw_ars).await.unwrap(); + let (tx, _) = broadcast::channel(1); + let args = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap(); let (is_testnet, debug, json) = (false, false, false); let data_dir = PathBuf::from_str(ARGS_DATA_DIR).unwrap(); let (expected_config, expected_request) = ( Config::default(is_testnet, Some(data_dir.clone()), debug, json), - Request::buy_xmr(is_testnet), + Request::buy_xmr(is_testnet, tx.clone()), ); let (actual_config, actual_request) = match args { @@ -880,13 +921,14 @@ mod tests { MULTI_ADDRESS, ]; + let (tx, _) = broadcast::channel(1); let data_dir = PathBuf::from_str(ARGS_DATA_DIR).unwrap(); - let args = parse_args_and_apply_defaults(raw_ars).await.unwrap(); + let args = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap(); let (is_testnet, debug, json) = (true, false, false); let (expected_config, expected_request) = ( Config::default(is_testnet, Some(data_dir.clone()), debug, json), - Request::buy_xmr(is_testnet), + Request::buy_xmr(is_testnet, tx.clone()), ); let (actual_config, actual_request) = match args { @@ -911,13 +953,14 @@ mod tests { SWAP_ID, ]; + let (tx, _) = broadcast::channel(1); let data_dir = PathBuf::from_str(ARGS_DATA_DIR).unwrap(); - let args = parse_args_and_apply_defaults(raw_ars).await.unwrap(); + let args = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap(); let (is_testnet, debug, json) = (false, false, false); let (expected_config, expected_request) = ( Config::default(is_testnet, Some(data_dir.clone()), debug, json), - Request::resume(), + Request::resume(tx.clone()), ); let (actual_config, actual_request) = match args { @@ -943,13 +986,14 @@ mod tests { SWAP_ID, ]; + let (tx, _) = broadcast::channel(1); let data_dir = PathBuf::from_str(ARGS_DATA_DIR).unwrap(); - let args = parse_args_and_apply_defaults(raw_ars).await.unwrap(); + let args = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap(); let (is_testnet, debug, json) = (true, false, false); let (expected_config, expected_request) = ( Config::default(is_testnet, Some(data_dir.clone()), debug, json), - Request::resume(), + Request::resume(tx.clone()), ); let (actual_config, actual_request) = match args { @@ -976,12 +1020,13 @@ mod tests { MULTI_ADDRESS, ]; - let args = parse_args_and_apply_defaults(raw_ars).await.unwrap(); + let (tx, _) = broadcast::channel(1); + let args = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap(); let (is_testnet, debug, json) = (false, true, false); let (expected_config, expected_request) = ( Config::default(is_testnet, None, debug, json), - Request::buy_xmr(is_testnet), + Request::buy_xmr(is_testnet, tx.clone()), ); let (actual_config, actual_request) = match args { @@ -1009,12 +1054,13 @@ mod tests { MULTI_ADDRESS, ]; - let args = parse_args_and_apply_defaults(raw_ars).await.unwrap(); + let (tx, _) = broadcast::channel(1); + let args = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap(); let (is_testnet, debug, json) = (true, true, false); let (expected_config, expected_request) = ( Config::default(is_testnet, None, debug, json), - Request::buy_xmr(is_testnet), + Request::buy_xmr(is_testnet, tx.clone()), ); let (actual_config, actual_request) = match args { @@ -1032,12 +1078,13 @@ mod tests { let raw_ars = vec![BINARY_NAME, "--debug", "resume", "--swap-id", SWAP_ID]; - let args = parse_args_and_apply_defaults(raw_ars).await.unwrap(); + let (tx, _) = broadcast::channel(1); + let args = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap(); let (is_testnet, debug, json) = (false, true, false); let (expected_config, expected_request) = ( Config::default(is_testnet, None, debug, json), - Request::resume(), + Request::resume(tx.clone()), ); let (actual_config, actual_request) = match args { @@ -1062,12 +1109,13 @@ mod tests { SWAP_ID, ]; - let args = parse_args_and_apply_defaults(raw_ars).await.unwrap(); + let (tx, _) = broadcast::channel(1); + let args = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap(); let (is_testnet, debug, json) = (true, true, false); let (expected_config, expected_request) = ( Config::default(is_testnet, None, debug, json), - Request::resume(), + Request::resume(tx.clone()), ); let (actual_config, actual_request) = match args { @@ -1094,13 +1142,14 @@ mod tests { MULTI_ADDRESS, ]; - let args = parse_args_and_apply_defaults(raw_ars).await.unwrap(); + let (tx, _) = broadcast::channel(1); + let args = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap(); let (is_testnet, debug, json) = (false, false, true); let data_dir = data_dir_path_cli(is_testnet); let (expected_config, expected_request) = ( Config::default(is_testnet, None, debug, json), - Request::buy_xmr(is_testnet), + Request::buy_xmr(is_testnet, tx.clone()), ); let (actual_config, actual_request) = match args { @@ -1128,12 +1177,13 @@ mod tests { MULTI_ADDRESS, ]; - let args = parse_args_and_apply_defaults(raw_ars).await.unwrap(); + let (tx, _) = broadcast::channel(1); + let args = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap(); let (is_testnet, debug, json) = (true, false, true); let (expected_config, expected_request) = ( Config::default(is_testnet, None, debug, json), - Request::buy_xmr(is_testnet), + Request::buy_xmr(is_testnet, tx.clone()), ); let (actual_config, actual_request) = match args { @@ -1149,13 +1199,14 @@ mod tests { #[sequential] async fn given_resume_on_mainnet_with_json_then_json_set() { + let (tx, _) = broadcast::channel(1); let raw_ars = vec![BINARY_NAME, "--json", "resume", "--swap-id", SWAP_ID]; - let args = parse_args_and_apply_defaults(raw_ars).await.unwrap(); + let args = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap(); let (is_testnet, debug, json) = (false, false, true); let (expected_config, expected_request) = ( Config::default(is_testnet, None, debug, json), - Request::resume(), + Request::resume(tx.clone()), ); let (actual_config, actual_request) = match args { @@ -1180,12 +1231,13 @@ mod tests { SWAP_ID, ]; - let args = parse_args_and_apply_defaults(raw_ars).await.unwrap(); + let (tx, _) = broadcast::channel(1); + let args = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap(); let (is_testnet, debug, json) = (true, false, true); let (expected_config, expected_request) = ( Config::default(is_testnet, None, debug, json), - Request::resume(), + Request::resume(tx.clone()), ); let (actual_config, actual_request) = match args { @@ -1210,7 +1262,8 @@ mod tests { "--seller", MULTI_ADDRESS, ]; - let result = parse_args_and_apply_defaults(raw_ars).await.unwrap_err(); + let (tx, _) = broadcast::channel(1); + let result = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap_err(); let raw_ars = vec![ BINARY_NAME, @@ -1222,7 +1275,7 @@ mod tests { "--seller", MULTI_ADDRESS, ]; - let result = parse_args_and_apply_defaults(raw_ars).await.unwrap_err(); + let result = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap_err(); let raw_ars = vec![ BINARY_NAME, @@ -1234,13 +1287,14 @@ mod tests { "--seller", MULTI_ADDRESS, ]; - let result = parse_args_and_apply_defaults(raw_ars).await.unwrap(); + let result = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap(); assert!(matches!(result, ParseResult::Context(_, _))); } #[tokio::test] #[sequential] async fn only_bech32_addresses_testnet_are_allowed() { + let (tx, _) = broadcast::channel(1); let raw_ars = vec![ BINARY_NAME, "--testnet", @@ -1252,7 +1306,7 @@ mod tests { "--seller", MULTI_ADDRESS, ]; - let result = parse_args_and_apply_defaults(raw_ars).await.unwrap_err(); + let result = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap_err(); let raw_ars = vec![ BINARY_NAME, @@ -1265,7 +1319,7 @@ mod tests { "--seller", MULTI_ADDRESS, ]; - let result = parse_args_and_apply_defaults(raw_ars).await.unwrap_err(); + let result = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap_err(); let raw_ars = vec![ BINARY_NAME, @@ -1278,7 +1332,7 @@ mod tests { "--seller", MULTI_ADDRESS, ]; - let result = parse_args_and_apply_defaults(raw_ars).await.unwrap(); + let result = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap(); assert!(matches!(result, ParseResult::Context(_, _))); } diff --git a/swap/src/rpc.rs b/swap/src/rpc.rs index d512a458..f926059e 100644 --- a/swap/src/rpc.rs +++ b/swap/src/rpc.rs @@ -1,5 +1,5 @@ use crate::api::Context; -use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle, RpcModule}; +use jsonrpsee::server::{ServerBuilder, ServerHandle, RpcModule}; use std::net::SocketAddr; use std::sync::Arc; use thiserror::Error; @@ -15,8 +15,8 @@ pub enum Error { pub async fn run_server( server_address: SocketAddr, context: Arc, -) -> anyhow::Result<(SocketAddr, HttpServerHandle)> { - let server = HttpServerBuilder::default().build(server_address).await?; +) -> anyhow::Result<(SocketAddr, ServerHandle)> { + let server = ServerBuilder::default().build(server_address).await?; let mut modules = RpcModule::new(()); { modules diff --git a/swap/src/rpc/methods.rs b/swap/src/rpc/methods.rs index d0887450..4cfb670f 100644 --- a/swap/src/rpc/methods.rs +++ b/swap/src/rpc/methods.rs @@ -1,10 +1,10 @@ use crate::api::{Context}; -use crate::api::request::{Params, Request, Method}; +use crate::api::request::{Params, Request, Method, Shutdown}; //use crate::rpc::Error; use anyhow::Result; use crate::{bitcoin, monero}; use crate::{bitcoin::bitcoin_address, monero::monero_address}; -use jsonrpsee::http_server::RpcModule; +use jsonrpsee::server::RpcModule; use libp2p::core::Multiaddr; use std::collections::HashMap; use std::str::FromStr; @@ -144,9 +144,10 @@ pub fn register_modules(context: Arc) -> RpcModule> { } async fn get_bitcoin_balance(context: &Arc) -> Result { - let request = Request { + let mut request = Request { params: Params::default(), cmd: Method::Balance, + shutdown: Shutdown::new(context.shutdown.subscribe()), }; let balance = request.call(Arc::clone(context)) .await @@ -156,9 +157,10 @@ async fn get_bitcoin_balance(context: &Arc) -> Result) -> Result { - let request = Request { + let mut request = Request { params: Params::default(), cmd: Method::History, + shutdown: Shutdown::new(context.shutdown.subscribe()), }; let history = request.call(Arc::clone(context)) .await @@ -167,9 +169,10 @@ async fn get_history(context: &Arc) -> Result) -> Result { - let request = Request { + let mut request = Request { params: Params::default(), cmd: Method::RawHistory, + shutdown: Shutdown::new(context.shutdown.subscribe()), }; let history = request.call(Arc::clone(context)) .await @@ -182,12 +185,13 @@ async fn get_seller( swap_id: Uuid, context: &Arc ) -> Result { - let request = Request { + let mut request = Request { params: Params { swap_id: Some(swap_id), ..Default::default() }, cmd: Method::GetSeller, + shutdown: Shutdown::new(context.shutdown.subscribe()), }; let result = request.call(Arc::clone(context)) .await @@ -200,12 +204,13 @@ async fn get_swap_start_date( swap_id: Uuid, context: &Arc ) -> Result { - let request = Request { + let mut request = Request { params: Params { swap_id: Some(swap_id), ..Default::default() }, cmd: Method::SwapStartDate, + shutdown: Shutdown::new(context.shutdown.subscribe()), }; let result = request.call(Arc::clone(context)) .await @@ -218,12 +223,13 @@ async fn resume_swap( swap_id: Uuid, context: &Arc, ) -> Result { - let request = Request { + let mut request = Request { params: Params { swap_id: Some(swap_id), ..Default::default() }, cmd: Method::Resume, + shutdown: Shutdown::new(context.shutdown.subscribe()), }; let result = request.call(Arc::clone(context)) @@ -236,13 +242,14 @@ async fn withdraw_btc( amount: Option, context: &Arc, ) -> Result { - let request = Request { + let mut request = Request { params: Params { amount, address: Some(withdraw_address), ..Default::default() }, cmd: Method::WithdrawBtc, + shutdown: Shutdown::new(context.shutdown.subscribe()), }; let result = request.call(Arc::clone(context)) .await @@ -256,7 +263,7 @@ async fn buy_xmr( seller: Multiaddr, context: &Arc, ) -> Result { - let request = Request { + let mut request = Request { params: Params { bitcoin_change_address: Some(bitcoin_change_address), monero_receive_address: Some(monero_receive_address), @@ -264,6 +271,7 @@ async fn buy_xmr( ..Default::default() }, cmd: Method::BuyXmr, + shutdown: Shutdown::new(context.shutdown.subscribe()), }; let swap = request.call(Arc::clone(context)) .await @@ -275,12 +283,13 @@ async fn list_sellers( rendezvous_point: Multiaddr, context: &Arc, ) -> Result { - let request = Request { + let mut request = Request { params: Params { rendezvous_point: Some(rendezvous_point), ..Default::default() }, cmd: Method::ListSellers, + shutdown: Shutdown::new(context.shutdown.subscribe()), }; let result = request.call(Arc::clone(context)) .await