mirror of
https://github.com/comit-network/xmr-btc-swap.git
synced 2025-08-07 05:52:31 -04:00
Merge branch 'rpc-server' of https://github.com/yamabiiko/xmr-btc-swap into rpc-server
This commit is contained in:
commit
ec65ea2b27
5 changed files with 94 additions and 91 deletions
|
@ -12,7 +12,7 @@ use std::fmt;
|
|||
use std::net::SocketAddr;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::{Arc, Once};
|
||||
use tokio::sync::{broadcast, Mutex};
|
||||
use tokio::sync::{broadcast, Mutex, broadcast::Receiver, broadcast::Sender};
|
||||
use url::Url;
|
||||
|
||||
static START: Once = Once::new();
|
||||
|
@ -30,6 +30,43 @@ pub struct Config {
|
|||
pub is_testnet: bool,
|
||||
}
|
||||
|
||||
impl Shutdown {
|
||||
pub fn new(listen: Receiver<()>) -> Shutdown {
|
||||
let (notify, _) = broadcast::channel(16);
|
||||
Shutdown {
|
||||
shutdown: Mutex::new(false),
|
||||
listen: Mutex::new(listen),
|
||||
notify
|
||||
}
|
||||
}
|
||||
|
||||
/// Receive the shutdown notice, waiting if necessary.
|
||||
pub async fn recv(&self) {
|
||||
// If the shutdown signal has already been received, then return
|
||||
// immediately.
|
||||
let mut guard_shutdown = self.shutdown.lock().await;
|
||||
if *guard_shutdown {
|
||||
return;
|
||||
}
|
||||
|
||||
let _ = self.listen.lock().await.recv().await;
|
||||
|
||||
// Remember that the signal has been received.
|
||||
*guard_shutdown = true;
|
||||
|
||||
// Send shutdown request to child tasks
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Shutdown {
|
||||
shutdown: Mutex<bool>,
|
||||
listen: Mutex<Receiver<()>>,
|
||||
notify: Sender<()>,
|
||||
}
|
||||
|
||||
|
||||
// workaround for warning over monero_rpc_process which we must own but not read
|
||||
#[allow(dead_code)]
|
||||
pub struct Context {
|
||||
|
@ -37,9 +74,8 @@ pub struct Context {
|
|||
bitcoin_wallet: Option<Arc<bitcoin::Wallet>>,
|
||||
monero_wallet: Option<Arc<monero::Wallet>>,
|
||||
monero_rpc_process: Option<monero::WalletRpcProcess>,
|
||||
running_swap: Arc<Mutex<bool>>,
|
||||
pub config: Config,
|
||||
pub shutdown: Arc<broadcast::Sender<()>>,
|
||||
pub shutdown: Arc<Shutdown>,
|
||||
}
|
||||
|
||||
impl Context {
|
||||
|
@ -52,7 +88,7 @@ impl Context {
|
|||
debug: bool,
|
||||
json: bool,
|
||||
server_address: Option<SocketAddr>,
|
||||
shutdown: broadcast::Sender<()>,
|
||||
sender: broadcast::Sender<()>,
|
||||
) -> Result<Context> {
|
||||
let data_dir = data::data_dir_from(data, is_testnet)?;
|
||||
let env_config = env_config_from(is_testnet);
|
||||
|
@ -112,8 +148,7 @@ impl Context {
|
|||
is_testnet,
|
||||
data_dir,
|
||||
},
|
||||
shutdown: Arc::new(shutdown),
|
||||
running_swap: Arc::new(Mutex::new(false)),
|
||||
shutdown: Arc::new(Shutdown::new(sender.subscribe())),
|
||||
};
|
||||
|
||||
Ok(context)
|
||||
|
@ -265,7 +300,6 @@ pub mod api_test {
|
|||
};
|
||||
|
||||
Request::new(
|
||||
tx.subscribe(),
|
||||
Method::BuyXmr,
|
||||
Params {
|
||||
seller: Some(seller),
|
||||
|
@ -277,9 +311,8 @@ pub mod api_test {
|
|||
)
|
||||
}
|
||||
|
||||
pub fn resume(tx: broadcast::Sender<()>) -> Request {
|
||||
pub fn resume() -> Request {
|
||||
Request::new(
|
||||
tx.subscribe(),
|
||||
Method::Resume,
|
||||
Params {
|
||||
swap_id: Some(Uuid::from_str(SWAP_ID).unwrap()),
|
||||
|
@ -288,9 +321,8 @@ pub mod api_test {
|
|||
)
|
||||
}
|
||||
|
||||
pub fn cancel(tx: broadcast::Sender<()>) -> Request {
|
||||
pub fn cancel() -> Request {
|
||||
Request::new(
|
||||
tx.subscribe(),
|
||||
Method::CancelAndRefund,
|
||||
Params {
|
||||
swap_id: Some(Uuid::from_str(SWAP_ID).unwrap()),
|
||||
|
@ -299,9 +331,8 @@ pub mod api_test {
|
|||
)
|
||||
}
|
||||
|
||||
pub fn refund(tx: broadcast::Sender<()>) -> Request {
|
||||
pub fn refund() -> Request {
|
||||
Request::new(
|
||||
tx.subscribe(),
|
||||
Method::CancelAndRefund,
|
||||
Params {
|
||||
swap_id: Some(Uuid::from_str(SWAP_ID).unwrap()),
|
||||
|
|
|
@ -20,10 +20,9 @@ use std::net::SocketAddr;
|
|||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use structopt::lazy_static::lazy_static;
|
||||
use tokio::sync::broadcast::Receiver;
|
||||
use tokio::sync::RwLock;
|
||||
use tracing::{debug_span, Instrument};
|
||||
use uuid::Uuid;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
lazy_static! {
|
||||
static ref SWAP_LOCK: RwLock<Option<Uuid>> = RwLock::new(None);
|
||||
|
@ -31,51 +30,9 @@ lazy_static! {
|
|||
|
||||
#[derive(PartialEq, Debug)]
|
||||
pub struct Request {
|
||||
pub cmd: Method,
|
||||
pub shutdown: Shutdown,
|
||||
pub cmd: Method
|
||||
}
|
||||
|
||||
impl Shutdown {
|
||||
pub fn new(notify: Receiver<()>) -> Shutdown {
|
||||
Shutdown {
|
||||
shutdown: false,
|
||||
notify,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns `true` if the shutdown signal has been received.
|
||||
pub fn is_shutdown(&self) -> bool {
|
||||
self.shutdown
|
||||
}
|
||||
|
||||
/// Receive the shutdown notice, waiting if necessary.
|
||||
pub async fn recv(&mut self) {
|
||||
// If the shutdown signal has already been received, then return
|
||||
// immediately.
|
||||
if self.shutdown {
|
||||
return;
|
||||
}
|
||||
|
||||
// Cannot receive a "lag error" as only one value is ever sent.
|
||||
let _ = self.notify.recv().await;
|
||||
|
||||
self.shutdown = true;
|
||||
|
||||
// Remember that the signal has been received.
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Shutdown {
|
||||
shutdown: bool,
|
||||
notify: Receiver<()>,
|
||||
}
|
||||
|
||||
impl PartialEq for Shutdown {
|
||||
fn eq(&self, other: &Shutdown) -> bool {
|
||||
self.shutdown == other.shutdown
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum Method {
|
||||
|
@ -116,10 +73,9 @@ pub enum Method {
|
|||
}
|
||||
|
||||
impl Request {
|
||||
pub fn new(shutdownReceiver: Receiver<()>, cmd: Method) -> Request {
|
||||
pub fn new(cmd: Method) -> Request {
|
||||
Request {
|
||||
cmd,
|
||||
shutdown: Shutdown::new(shutdownReceiver),
|
||||
cmd
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -132,7 +88,7 @@ impl Request {
|
|||
}
|
||||
}
|
||||
|
||||
async fn handle_cmd(mut self, context: Arc<Context>) -> Result<serde_json::Value> {
|
||||
async fn handle_cmd(self, context: Arc<Context>) -> Result<serde_json::Value> {
|
||||
match self.cmd {
|
||||
Method::GetSwapInfo { swap_id } => {
|
||||
let bitcoin_wallet = context
|
||||
|
@ -287,17 +243,37 @@ impl Request {
|
|||
bitcoin_change_address,
|
||||
amount,
|
||||
);
|
||||
let mut halt = context.shutdown.notify.subscribe();
|
||||
|
||||
tokio::select! {
|
||||
result = event_loop => {
|
||||
result
|
||||
.context("EventLoop panicked")?;
|
||||
},
|
||||
result = bob::run(swap) => {
|
||||
result
|
||||
.context("Failed to complete swap")?;
|
||||
// execution will halt if the server daemon is stopped or a cancel running swap
|
||||
// request is sent
|
||||
tokio::spawn(async move {
|
||||
tokio::select! {
|
||||
result = event_loop => {
|
||||
match result {
|
||||
Ok(_) => {
|
||||
tracing::debug!(%swap_id, "EventLoop completed")
|
||||
}
|
||||
Err(error) => {
|
||||
tracing::error!(%swap_id, "EventLoop failed: {:#}", error)
|
||||
}
|
||||
}
|
||||
},
|
||||
result = bob::run(swap) => {
|
||||
match result {
|
||||
Ok(state) => {
|
||||
tracing::debug!(%swap_id, state=%state, "Swap completed")
|
||||
}
|
||||
Err(error) => {
|
||||
tracing::error!(%swap_id, "Failed to complete swap: {:#}", error)
|
||||
}
|
||||
}
|
||||
}
|
||||
_ = halt.recv() => {
|
||||
tracing::debug!(%swap_id, "Swap cancel signal received while running swap")
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
Ok(json!({
|
||||
"empty": "true"
|
||||
}))
|
||||
|
@ -314,6 +290,7 @@ impl Request {
|
|||
}
|
||||
Method::RawHistory => {
|
||||
let raw_history = context.db.raw_all().await?;
|
||||
|
||||
Ok(json!({ "raw_history": raw_history }))
|
||||
}
|
||||
Method::Config => {
|
||||
|
@ -370,9 +347,11 @@ impl Request {
|
|||
rpc::run_server(server_address, Arc::clone(&context)).await?;
|
||||
|
||||
loop {
|
||||
let shutdown = Arc::clone(&context.shutdown);
|
||||
tokio::select! {
|
||||
_ = self.shutdown.recv() => {
|
||||
_ = shutdown.recv() => {
|
||||
server_handle.stop()?;
|
||||
context.shutdown.notify.send(())?;
|
||||
return Ok(json!({
|
||||
"result": []
|
||||
}))
|
||||
|
@ -397,7 +376,7 @@ impl Request {
|
|||
"balance": bitcoin_balance.to_sat()
|
||||
}))
|
||||
}
|
||||
Method::Resume { swap_id } => {
|
||||
Method::Resume {swap_id} => {
|
||||
let seller_peer_id = context.db.get_peer_id(swap_id).await?;
|
||||
let seller_addresses = context.db.get_addresses(seller_peer_id).await?;
|
||||
|
||||
|
@ -427,7 +406,7 @@ impl Request {
|
|||
.context("Could not get Tor SOCKS5 port")?,
|
||||
behaviour,
|
||||
)
|
||||
.await?;
|
||||
.await?;
|
||||
let our_peer_id = swarm.local_peer_id();
|
||||
|
||||
tracing::debug!(peer_id = %our_peer_id, "Network layer initialized");
|
||||
|
@ -462,7 +441,7 @@ impl Request {
|
|||
event_loop_handle,
|
||||
monero_receive_address,
|
||||
)
|
||||
.await?;
|
||||
.await?;
|
||||
|
||||
tokio::select! {
|
||||
event_loop_result = handle => {
|
||||
|
@ -476,7 +455,7 @@ impl Request {
|
|||
"result": []
|
||||
}))
|
||||
}
|
||||
Method::CancelAndRefund { swap_id } => {
|
||||
Method::CancelAndRefund {swap_id} => {
|
||||
let bitcoin_wallet = context
|
||||
.bitcoin_wallet
|
||||
.as_ref()
|
||||
|
@ -487,7 +466,7 @@ impl Request {
|
|||
Arc::clone(bitcoin_wallet),
|
||||
Arc::clone(&context.db),
|
||||
)
|
||||
.await?;
|
||||
.await?;
|
||||
|
||||
Ok(json!({
|
||||
"result": state,
|
||||
|
@ -515,7 +494,7 @@ impl Request {
|
|||
.context("Could not get Tor SOCKS5 port")?,
|
||||
identity,
|
||||
)
|
||||
.await?;
|
||||
.await?;
|
||||
|
||||
for seller in &sellers {
|
||||
match seller.status {
|
||||
|
|
|
@ -22,7 +22,7 @@ use tokio::sync::broadcast;
|
|||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
let (tx, _) = broadcast::channel(1);
|
||||
let (context, request) = match parse_args_and_apply_defaults(env::args_os(), tx).await? {
|
||||
let (context, mut request) = match parse_args_and_apply_defaults(env::args_os(), tx.clone()).await? {
|
||||
ParseResult::Context(context, request) => (context, request),
|
||||
ParseResult::PrintAndExitZero { message } => {
|
||||
println!("{}", message);
|
||||
|
@ -34,6 +34,7 @@ async fn main() -> Result<()> {
|
|||
eprintln!("{}", e);
|
||||
}
|
||||
let _result = request.call(Arc::clone(&context)).await?;
|
||||
tx.send(())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
|
@ -79,7 +79,6 @@ where
|
|||
bitcoin_address::validate(bitcoin_change_address, is_testnet)?;
|
||||
|
||||
let request = Request::new(
|
||||
rx.subscribe(),
|
||||
Method::BuyXmr {
|
||||
seller,
|
||||
bitcoin_change_address,
|
||||
|
@ -103,21 +102,21 @@ where
|
|||
(context, request)
|
||||
}
|
||||
CliCommand::History => {
|
||||
let request = Request::new(rx.subscribe(), Method::History);
|
||||
let request = Request::new(Method::History);
|
||||
|
||||
let context =
|
||||
Context::build(None, None, None, data, is_testnet, debug, json, None, rx).await?;
|
||||
(context, request)
|
||||
}
|
||||
CliCommand::Config => {
|
||||
let request = Request::new(rx.subscribe(), Method::Config);
|
||||
let request = Request::new(Method::Config);
|
||||
|
||||
let context =
|
||||
Context::build(None, None, None, data, is_testnet, debug, json, None, rx).await?;
|
||||
(context, request)
|
||||
}
|
||||
CliCommand::Balance { bitcoin } => {
|
||||
let request = Request::new(rx.subscribe(), Method::Balance);
|
||||
let request = Request::new(Method::Balance);
|
||||
|
||||
let context = Context::build(
|
||||
Some(bitcoin),
|
||||
|
@ -140,7 +139,6 @@ where
|
|||
tor,
|
||||
} => {
|
||||
let request = Request::new(
|
||||
rx.subscribe(),
|
||||
Method::StartDaemon {
|
||||
server_address
|
||||
}
|
||||
|
@ -168,7 +166,6 @@ where
|
|||
let address = bitcoin_address::validate(address, is_testnet)?;
|
||||
|
||||
let request = Request::new(
|
||||
rx.subscribe(),
|
||||
Method::WithdrawBtc {
|
||||
amount,
|
||||
address,
|
||||
|
@ -196,7 +193,6 @@ where
|
|||
tor,
|
||||
} => {
|
||||
let request = Request::new(
|
||||
rx.subscribe(),
|
||||
Method::Resume {
|
||||
swap_id
|
||||
}
|
||||
|
@ -222,7 +218,6 @@ where
|
|||
tor,
|
||||
} => {
|
||||
let request = Request::new(
|
||||
rx.subscribe(),
|
||||
Method::CancelAndRefund {
|
||||
swap_id
|
||||
}
|
||||
|
@ -247,7 +242,6 @@ where
|
|||
tor,
|
||||
} => {
|
||||
let request = Request::new(
|
||||
rx.subscribe(),
|
||||
Method::ListSellers {
|
||||
rendezvous_point
|
||||
}
|
||||
|
@ -270,7 +264,6 @@ where
|
|||
}
|
||||
CliCommand::ExportBitcoinWallet { bitcoin } => {
|
||||
let request = Request::new(
|
||||
rx.subscribe(),
|
||||
Method::ExportBitcoinWallet,
|
||||
);
|
||||
|
||||
|
@ -292,7 +285,6 @@ where
|
|||
swap_id: SwapId { swap_id },
|
||||
} => {
|
||||
let request = Request::new(
|
||||
rx.subscribe(),
|
||||
Method::MoneroRecovery {
|
||||
swap_id
|
||||
}
|
||||
|
|
|
@ -159,7 +159,7 @@ async fn execute_request(
|
|||
cmd: Method,
|
||||
context: &Arc<Context>,
|
||||
) -> Result<serde_json::Value, jsonrpsee_core::Error> {
|
||||
let request = Request::new(context.shutdown.subscribe(), cmd);
|
||||
let request = Request::new(cmd);
|
||||
request
|
||||
.call(Arc::clone(context))
|
||||
.await
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue