add broadcast channel to handle shutdowns gracefully and prepare for RPC server test

This commit is contained in:
Lorenzo Tucci 2022-12-09 22:33:52 +01:00
parent aeeffccda2
commit cf5efa8ad0
No known key found for this signature in database
GPG key ID: D98C4FA2CDF590A0
8 changed files with 382 additions and 199 deletions

View file

@ -1,5 +1,5 @@
use crate::api::Context;
use crate::api::request::{Request, Params, Method};
use crate::api::request::{Request, Params, Method, Shutdown};
use crate::bitcoin::{Amount, bitcoin_address};
use crate::monero::monero_address;
use crate::monero;
@ -13,6 +13,7 @@ use std::sync::Arc;
use structopt::{clap, StructOpt};
use url::Url;
use uuid::Uuid;
use tokio::sync::broadcast;
// See: https://moneroworld.com/
pub const DEFAULT_MONERO_DAEMON_ADDRESS: &str = "node.community.rino.io:18081";
@ -41,7 +42,7 @@ pub enum ParseResult {
PrintAndExitZero { message: String },
}
pub async fn parse_args_and_apply_defaults<I, T>(raw_args: I) -> Result<ParseResult>
pub async fn parse_args_and_apply_defaults<I, T>(raw_args: I, rx: broadcast::Sender<()>) -> Result<ParseResult>
where
I: IntoIterator<Item = T>,
T: Into<OsString> + Clone,
@ -60,7 +61,6 @@ where
let json = args.json;
let is_testnet = args.testnet;
let data = args.data;
let (context, request) = match args.cmd {
CliCommand::BuyXmr {
seller: Seller { seller },
@ -70,18 +70,6 @@ where
monero_receive_address,
tor,
} => {
let context = Context::build(
Some(bitcoin),
Some(monero),
Some(tor),
data,
is_testnet,
debug,
json,
None,
)
.await?;
let monero_receive_address = monero_address::validate(monero_receive_address, is_testnet)?;
let bitcoin_change_address = bitcoin_address::validate(bitcoin_change_address, is_testnet)?;
@ -93,30 +81,52 @@ where
..Default::default()
},
cmd: Method::BuyXmr,
shutdown: Shutdown::new(rx.subscribe()),
};
let context = Context::build(
Some(bitcoin),
Some(monero),
Some(tor),
data,
is_testnet,
debug,
json,
None,
rx,
)
.await?;
(context, request)
}
CliCommand::History => {
let context =
Context::build(None, None, None, data, is_testnet, debug, json, None).await?;
let request = Request {
params: Params::default(),
cmd: Method::History,
shutdown: Shutdown::new(rx.subscribe()),
};
let context =
Context::build(None, None, None, data, is_testnet, debug, json, None, rx).await?;
(context, request)
}
CliCommand::Config => {
let context =
Context::build(None, None, None, data, is_testnet, debug, json, None).await?;
let request = Request {
params: Params::default(),
cmd: Method::Config,
shutdown: Shutdown::new(rx.subscribe()),
};
let context =
Context::build(None, None, None, data, is_testnet, debug, json, None, rx).await?;
(context, request)
}
CliCommand::Balance { bitcoin } => {
let request = Request {
params: Params::default(),
cmd: Method::Balance,
shutdown: Shutdown::new(rx.subscribe()),
};
let context = Context::build(
Some(bitcoin),
None,
@ -126,12 +136,9 @@ where
debug,
json,
None,
rx,
)
.await?;
let request = Request {
params: Params::default(),
cmd: Method::Balance,
};
(context, request)
}
CliCommand::StartDaemon {
@ -140,6 +147,12 @@ where
monero,
tor,
} => {
let request = Request {
params: Params::default(),
cmd: Method::StartDaemon,
shutdown: Shutdown::new(rx.subscribe()),
};
let context = Context::build(
Some(bitcoin),
Some(monero),
@ -149,12 +162,9 @@ where
debug,
json,
server_address,
rx,
)
.await?;
let request = Request {
params: Params::default(),
cmd: Method::StartDaemon,
};
(context, request)
}
CliCommand::WithdrawBtc {
@ -162,18 +172,6 @@ where
amount,
address,
} => {
let context = Context::build(
Some(bitcoin),
None,
None,
data,
is_testnet,
debug,
json,
None,
)
.await?;
let address = bitcoin_address::validate(address, is_testnet)?;
let request = Request {
@ -183,7 +181,21 @@ where
..Default::default()
},
cmd: Method::WithdrawBtc,
shutdown: Shutdown::new(rx.subscribe()),
};
let context = Context::build(
Some(bitcoin),
None,
None,
data,
is_testnet,
debug,
json,
None,
rx,
)
.await?;
(context, request)
}
CliCommand::Resume {
@ -192,6 +204,15 @@ where
monero,
tor,
} => {
let request = Request {
params: Params {
swap_id: Some(swap_id),
..Default::default()
},
cmd: Method::Resume,
shutdown: Shutdown::new(rx.subscribe()),
};
let context = Context::build(
Some(bitcoin),
Some(monero),
@ -201,15 +222,9 @@ where
debug,
json,
None,
rx,
)
.await?;
let request = Request {
params: Params {
swap_id: Some(swap_id),
..Default::default()
},
cmd: Method::Resume,
};
(context, request)
}
CliCommand::Cancel {
@ -217,6 +232,15 @@ where
bitcoin,
tor,
} => {
let request = Request {
params: Params {
swap_id: Some(swap_id),
..Default::default()
},
cmd: Method::Cancel,
shutdown: Shutdown::new(rx.subscribe()),
};
let context = Context::build(
Some(bitcoin),
None,
@ -226,15 +250,9 @@ where
debug,
json,
None,
rx,
)
.await?;
let request = Request {
params: Params {
swap_id: Some(swap_id),
..Default::default()
},
cmd: Method::Cancel,
};
(context, request)
}
CliCommand::Refund {
@ -242,6 +260,15 @@ where
bitcoin,
tor,
} => {
let request = Request {
params: Params {
swap_id: Some(swap_id),
..Default::default()
},
cmd: Method::Refund,
shutdown: Shutdown::new(rx.subscribe()),
};
let context = Context::build(
Some(bitcoin),
None,
@ -251,34 +278,36 @@ where
debug,
json,
None,
rx,
)
.await?;
let request = Request {
params: Params {
swap_id: Some(swap_id),
..Default::default()
},
cmd: Method::Refund,
};
(context, request)
}
CliCommand::ListSellers {
rendezvous_point,
tor,
} => {
let context =
Context::build(None, None, Some(tor), data, is_testnet, debug, json, None).await?;
let request = Request {
params: Params {
rendezvous_point: Some(rendezvous_point),
..Default::default()
},
cmd: Method::ListSellers,
shutdown: Shutdown::new(rx.subscribe()),
};
let context =
Context::build(None, None, Some(tor), data, is_testnet, debug, json, None, rx).await?;
(context, request)
}
CliCommand::ExportBitcoinWallet { bitcoin } => {
let request = Request {
params: Params::default(),
cmd: Method::ExportBitcoinWallet,
shutdown: Shutdown::new(rx.subscribe()),
};
let context = Context::build(
Some(bitcoin),
None,
@ -288,25 +317,24 @@ where
debug,
json,
None,
rx,
)
.await?;
let request = Request {
params: Params::default(),
cmd: Method::ExportBitcoinWallet,
};
(context, request)
}
CliCommand::MoneroRecovery { swap_id } => {
let context =
Context::build(None, None, None, data, is_testnet, debug, json, None).await?;
let request = Request {
params: Params {
swap_id: Some(swap_id.swap_id),
..Default::default()
},
cmd: Method::MoneroRecovery,
shutdown: Shutdown::new(rx.subscribe()),
};
let context =
Context::build(None, None, None, data, is_testnet, debug, json, None, rx).await?;
(context, request)
}
};
@ -577,6 +605,8 @@ mod tests {
use crate::api::api_test::*;
use sequential_test::sequential;
use crate::monero::monero_address::MoneroAddressNetworkMismatch;
use crate::api::Config;
use crate::fs::system_data_dir;
const BINARY_NAME: &str = "swap";
const ARGS_DATA_DIR: &str = "/tmp/dir/";
@ -595,12 +625,13 @@ mod tests {
MULTI_ADDRESS,
];
let args = parse_args_and_apply_defaults(raw_ars).await.unwrap();
let (tx, _) = broadcast::channel(1);
let args = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap();
let (is_testnet, debug, json) = (false, false, false);
let (expected_config, expected_request) = (
Config::default(is_testnet, None, debug, json),
Request::buy_xmr(is_testnet),
Request::buy_xmr(is_testnet, tx.clone()),
);
let (actual_config, actual_request) = match args {
@ -627,12 +658,13 @@ mod tests {
MULTI_ADDRESS,
];
let args = parse_args_and_apply_defaults(raw_ars).await.unwrap();
let (tx, _) = broadcast::channel(1);
let args = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap();
let (is_testnet, debug, json) = (true, false, false);
let (expected_config, expected_request) = (
Config::default(is_testnet, None, debug, json),
Request::buy_xmr(is_testnet),
Request::buy_xmr(is_testnet, tx.clone()),
);
let (actual_config, actual_request) = match args {
@ -658,7 +690,8 @@ mod tests {
MULTI_ADDRESS,
];
let err = parse_args_and_apply_defaults(raw_ars).await.unwrap_err();
let (tx, _) = broadcast::channel(1);
let err = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap_err();
assert_eq!(
err.downcast_ref::<MoneroAddressNetworkMismatch>().unwrap(),
@ -684,7 +717,8 @@ mod tests {
MULTI_ADDRESS,
];
let err = parse_args_and_apply_defaults(raw_ars).await.unwrap_err();
let (tx, _) = broadcast::channel(1);
let err = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap_err();
assert_eq!(
err.downcast_ref::<MoneroAddressNetworkMismatch>().unwrap(),
@ -700,12 +734,13 @@ mod tests {
async fn given_resume_on_mainnet_then_defaults_to_mainnet() {
let raw_ars = vec![BINARY_NAME, "resume", "--swap-id", SWAP_ID];
let args = parse_args_and_apply_defaults(raw_ars).await.unwrap();
let (tx, _) = broadcast::channel(1);
let args = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap();
let (is_testnet, debug, json) = (false, false, false);
let (expected_config, expected_request) = (
Config::default(is_testnet, None, debug, json),
Request::resume(),
Request::resume(tx.clone()),
);
let (actual_config, actual_request) = match args {
@ -722,12 +757,13 @@ mod tests {
async fn given_resume_on_testnet_then_defaults_to_testnet() {
let raw_ars = vec![BINARY_NAME, "--testnet", "resume", "--swap-id", SWAP_ID];
let args = parse_args_and_apply_defaults(raw_ars).await.unwrap();
let (tx, _) = broadcast::channel(1);
let args = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap();
let (is_testnet, debug, json) = (true, false, false);
let (expected_config, expected_request) = (
Config::default(is_testnet, None, debug, json),
Request::resume(),
Request::resume(tx.clone()),
);
let (actual_config, actual_request) = match args {
@ -744,13 +780,14 @@ mod tests {
async fn given_cancel_on_mainnet_then_defaults_to_mainnet() {
let raw_ars = vec![BINARY_NAME, "cancel", "--swap-id", SWAP_ID];
let args = parse_args_and_apply_defaults(raw_ars).await.unwrap();
let (tx, _) = broadcast::channel(1);
let args = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap();
let (is_testnet, debug, json) = (false, false, false);
let (expected_config, expected_request) = (
Config::default(is_testnet, None, debug, json),
Request::cancel(),
Request::cancel(tx.clone()),
);
let (actual_config, actual_request) = match args {
@ -767,12 +804,13 @@ mod tests {
async fn given_cancel_on_testnet_then_defaults_to_testnet() {
let raw_ars = vec![BINARY_NAME, "--testnet", "cancel", "--swap-id", SWAP_ID];
let args = parse_args_and_apply_defaults(raw_ars).await.unwrap();
let (tx, _) = broadcast::channel(1);
let args = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap();
let (is_testnet, debug, json) = (true, false, false);
let (expected_config, expected_request) = (
Config::default(is_testnet, None, debug, json),
Request::cancel(),
Request::cancel(tx.clone()),
);
let (actual_config, actual_request) = match args {
@ -789,12 +827,13 @@ mod tests {
async fn given_refund_on_mainnet_then_defaults_to_mainnet() {
let raw_ars = vec![BINARY_NAME, "refund", "--swap-id", SWAP_ID];
let args = parse_args_and_apply_defaults(raw_ars).await.unwrap();
let (tx, _) = broadcast::channel(1);
let args = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap();
let (is_testnet, debug, json) = (false, false, false);
let (expected_config, expected_request) = (
Config::default(is_testnet, None, debug, json),
Request::refund(),
Request::refund(tx.clone()),
);
let (actual_config, actual_request) = match args {
@ -811,12 +850,13 @@ mod tests {
async fn given_refund_on_testnet_then_defaults_to_testnet() {
let raw_ars = vec![BINARY_NAME, "--testnet", "refund", "--swap-id", SWAP_ID];
let args = parse_args_and_apply_defaults(raw_ars).await.unwrap();
let (tx, _) = broadcast::channel(1);
let args = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap();
let (is_testnet, debug, json) = (true, false, false);
let (expected_config, expected_request) = (
Config::default(is_testnet, None, debug, json),
Request::refund(),
Request::refund(tx.clone()),
);
let (actual_config, actual_request) = match args {
@ -844,13 +884,14 @@ mod tests {
MULTI_ADDRESS,
];
let args = parse_args_and_apply_defaults(raw_ars).await.unwrap();
let (tx, _) = broadcast::channel(1);
let args = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap();
let (is_testnet, debug, json) = (false, false, false);
let data_dir = PathBuf::from_str(ARGS_DATA_DIR).unwrap();
let (expected_config, expected_request) = (
Config::default(is_testnet, Some(data_dir.clone()), debug, json),
Request::buy_xmr(is_testnet),
Request::buy_xmr(is_testnet, tx.clone()),
);
let (actual_config, actual_request) = match args {
@ -880,13 +921,14 @@ mod tests {
MULTI_ADDRESS,
];
let (tx, _) = broadcast::channel(1);
let data_dir = PathBuf::from_str(ARGS_DATA_DIR).unwrap();
let args = parse_args_and_apply_defaults(raw_ars).await.unwrap();
let args = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap();
let (is_testnet, debug, json) = (true, false, false);
let (expected_config, expected_request) = (
Config::default(is_testnet, Some(data_dir.clone()), debug, json),
Request::buy_xmr(is_testnet),
Request::buy_xmr(is_testnet, tx.clone()),
);
let (actual_config, actual_request) = match args {
@ -911,13 +953,14 @@ mod tests {
SWAP_ID,
];
let (tx, _) = broadcast::channel(1);
let data_dir = PathBuf::from_str(ARGS_DATA_DIR).unwrap();
let args = parse_args_and_apply_defaults(raw_ars).await.unwrap();
let args = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap();
let (is_testnet, debug, json) = (false, false, false);
let (expected_config, expected_request) = (
Config::default(is_testnet, Some(data_dir.clone()), debug, json),
Request::resume(),
Request::resume(tx.clone()),
);
let (actual_config, actual_request) = match args {
@ -943,13 +986,14 @@ mod tests {
SWAP_ID,
];
let (tx, _) = broadcast::channel(1);
let data_dir = PathBuf::from_str(ARGS_DATA_DIR).unwrap();
let args = parse_args_and_apply_defaults(raw_ars).await.unwrap();
let args = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap();
let (is_testnet, debug, json) = (true, false, false);
let (expected_config, expected_request) = (
Config::default(is_testnet, Some(data_dir.clone()), debug, json),
Request::resume(),
Request::resume(tx.clone()),
);
let (actual_config, actual_request) = match args {
@ -976,12 +1020,13 @@ mod tests {
MULTI_ADDRESS,
];
let args = parse_args_and_apply_defaults(raw_ars).await.unwrap();
let (tx, _) = broadcast::channel(1);
let args = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap();
let (is_testnet, debug, json) = (false, true, false);
let (expected_config, expected_request) = (
Config::default(is_testnet, None, debug, json),
Request::buy_xmr(is_testnet),
Request::buy_xmr(is_testnet, tx.clone()),
);
let (actual_config, actual_request) = match args {
@ -1009,12 +1054,13 @@ mod tests {
MULTI_ADDRESS,
];
let args = parse_args_and_apply_defaults(raw_ars).await.unwrap();
let (tx, _) = broadcast::channel(1);
let args = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap();
let (is_testnet, debug, json) = (true, true, false);
let (expected_config, expected_request) = (
Config::default(is_testnet, None, debug, json),
Request::buy_xmr(is_testnet),
Request::buy_xmr(is_testnet, tx.clone()),
);
let (actual_config, actual_request) = match args {
@ -1032,12 +1078,13 @@ mod tests {
let raw_ars = vec![BINARY_NAME, "--debug", "resume", "--swap-id", SWAP_ID];
let args = parse_args_and_apply_defaults(raw_ars).await.unwrap();
let (tx, _) = broadcast::channel(1);
let args = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap();
let (is_testnet, debug, json) = (false, true, false);
let (expected_config, expected_request) = (
Config::default(is_testnet, None, debug, json),
Request::resume(),
Request::resume(tx.clone()),
);
let (actual_config, actual_request) = match args {
@ -1062,12 +1109,13 @@ mod tests {
SWAP_ID,
];
let args = parse_args_and_apply_defaults(raw_ars).await.unwrap();
let (tx, _) = broadcast::channel(1);
let args = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap();
let (is_testnet, debug, json) = (true, true, false);
let (expected_config, expected_request) = (
Config::default(is_testnet, None, debug, json),
Request::resume(),
Request::resume(tx.clone()),
);
let (actual_config, actual_request) = match args {
@ -1094,13 +1142,14 @@ mod tests {
MULTI_ADDRESS,
];
let args = parse_args_and_apply_defaults(raw_ars).await.unwrap();
let (tx, _) = broadcast::channel(1);
let args = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap();
let (is_testnet, debug, json) = (false, false, true);
let data_dir = data_dir_path_cli(is_testnet);
let (expected_config, expected_request) = (
Config::default(is_testnet, None, debug, json),
Request::buy_xmr(is_testnet),
Request::buy_xmr(is_testnet, tx.clone()),
);
let (actual_config, actual_request) = match args {
@ -1128,12 +1177,13 @@ mod tests {
MULTI_ADDRESS,
];
let args = parse_args_and_apply_defaults(raw_ars).await.unwrap();
let (tx, _) = broadcast::channel(1);
let args = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap();
let (is_testnet, debug, json) = (true, false, true);
let (expected_config, expected_request) = (
Config::default(is_testnet, None, debug, json),
Request::buy_xmr(is_testnet),
Request::buy_xmr(is_testnet, tx.clone()),
);
let (actual_config, actual_request) = match args {
@ -1149,13 +1199,14 @@ mod tests {
#[sequential]
async fn given_resume_on_mainnet_with_json_then_json_set() {
let (tx, _) = broadcast::channel(1);
let raw_ars = vec![BINARY_NAME, "--json", "resume", "--swap-id", SWAP_ID];
let args = parse_args_and_apply_defaults(raw_ars).await.unwrap();
let args = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap();
let (is_testnet, debug, json) = (false, false, true);
let (expected_config, expected_request) = (
Config::default(is_testnet, None, debug, json),
Request::resume(),
Request::resume(tx.clone()),
);
let (actual_config, actual_request) = match args {
@ -1180,12 +1231,13 @@ mod tests {
SWAP_ID,
];
let args = parse_args_and_apply_defaults(raw_ars).await.unwrap();
let (tx, _) = broadcast::channel(1);
let args = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap();
let (is_testnet, debug, json) = (true, false, true);
let (expected_config, expected_request) = (
Config::default(is_testnet, None, debug, json),
Request::resume(),
Request::resume(tx.clone()),
);
let (actual_config, actual_request) = match args {
@ -1210,7 +1262,8 @@ mod tests {
"--seller",
MULTI_ADDRESS,
];
let result = parse_args_and_apply_defaults(raw_ars).await.unwrap_err();
let (tx, _) = broadcast::channel(1);
let result = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap_err();
let raw_ars = vec![
BINARY_NAME,
@ -1222,7 +1275,7 @@ mod tests {
"--seller",
MULTI_ADDRESS,
];
let result = parse_args_and_apply_defaults(raw_ars).await.unwrap_err();
let result = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap_err();
let raw_ars = vec![
BINARY_NAME,
@ -1234,13 +1287,14 @@ mod tests {
"--seller",
MULTI_ADDRESS,
];
let result = parse_args_and_apply_defaults(raw_ars).await.unwrap();
let result = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap();
assert!(matches!(result, ParseResult::Context(_, _)));
}
#[tokio::test]
#[sequential]
async fn only_bech32_addresses_testnet_are_allowed() {
let (tx, _) = broadcast::channel(1);
let raw_ars = vec![
BINARY_NAME,
"--testnet",
@ -1252,7 +1306,7 @@ mod tests {
"--seller",
MULTI_ADDRESS,
];
let result = parse_args_and_apply_defaults(raw_ars).await.unwrap_err();
let result = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap_err();
let raw_ars = vec![
BINARY_NAME,
@ -1265,7 +1319,7 @@ mod tests {
"--seller",
MULTI_ADDRESS,
];
let result = parse_args_and_apply_defaults(raw_ars).await.unwrap_err();
let result = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap_err();
let raw_ars = vec![
BINARY_NAME,
@ -1278,7 +1332,7 @@ mod tests {
"--seller",
MULTI_ADDRESS,
];
let result = parse_args_and_apply_defaults(raw_ars).await.unwrap();
let result = parse_args_and_apply_defaults(raw_ars, tx.clone()).await.unwrap();
assert!(matches!(result, ParseResult::Context(_, _)));
}