From f26b27c2664f423b7fb721c3eacaab0ab225075c Mon Sep 17 00:00:00 2001 From: Lorenzo Tucci Date: Tue, 22 Nov 2022 19:01:13 +0100 Subject: [PATCH] writing async rpc methods and using arc for shared struct references --- swap/src/api.rs | 117 ++++++++++++++++++++++++++++++++---- swap/src/bin/swap.rs | 3 +- swap/src/rpc.rs | 5 +- swap/src/rpc/methods.rs | 130 ++++++++++++++++++++++++++++++++++++---- 4 files changed, 231 insertions(+), 24 deletions(-) diff --git a/swap/src/api.rs b/swap/src/api.rs index 6a97778c..6a90a4ce 100644 --- a/swap/src/api.rs +++ b/swap/src/api.rs @@ -36,6 +36,7 @@ use crate::fs::system_data_dir; use serde_json::json; use std::str::FromStr; use tokio::task; +use serde::ser::{Serialize, Serializer, SerializeStruct}; pub struct Request { @@ -56,8 +57,8 @@ pub struct Params { pub struct Init { db: Arc, - pub bitcoin_wallet: Option, - monero_wallet: Option, + pub bitcoin_wallet: Option>, + monero_wallet: Option>, tor_socks5_port: Option, namespace: XmrBtcNamespace, //server_handle: Option>, @@ -69,9 +70,91 @@ pub struct Init { } impl Request { - pub async fn call(&self, api_init: &Init) -> Result { + pub async fn call(&self, api_init: Arc) -> Result { let result = match self.cmd { Command::BuyXmr => { + let swap_id = Uuid::new_v4(); + + let seed = api_init.seed.as_ref().unwrap(); + let env_config = env_config_from(api_init.is_testnet); + let btc = api_init.bitcoin_wallet.as_ref().unwrap(); + let seller = self.params.seller.clone().unwrap(); + let monero_receive_address = self.params.monero_receive_address.unwrap(); + let bitcoin_change_address = self.params.bitcoin_change_address.clone().unwrap(); + + let bitcoin_wallet = btc; + let seller_peer_id = self.params.seller.as_ref().unwrap() + .extract_peer_id() + .context("Seller address must contain peer ID")?; + api_init.db.insert_address(seller_peer_id, seller.clone()).await?; + + let behaviour = cli::Behaviour::new( + seller_peer_id, + env_config_from(api_init.is_testnet), + bitcoin_wallet.clone(), + (seed.derive_libp2p_identity(), api_init.namespace), + ); + let mut swarm = + swarm::cli(seed.derive_libp2p_identity(), api_init.tor_socks5_port.unwrap(), behaviour).await?; + swarm.behaviour_mut().add_address(seller_peer_id, seller); + + tracing::debug!(peer_id = %swarm.local_peer_id(), "Network layer initialized"); + + let (event_loop, mut event_loop_handle) = + EventLoop::new(swap_id, swarm, seller_peer_id)?; + let event_loop = tokio::spawn(event_loop.run()); + + let max_givable = || bitcoin_wallet.max_giveable(TxLock::script_size()); + let estimate_fee = |amount| bitcoin_wallet.estimate_fee(TxLock::weight(), amount); + + let (amount, fees) = match determine_btc_to_swap( + api_init.json, + event_loop_handle.request_quote(), + bitcoin_wallet.new_address(), + || bitcoin_wallet.balance(), + max_givable, + || bitcoin_wallet.sync(), + estimate_fee, + ) + .await + { + Ok(val) => val, + Err(error) => match error.downcast::() { + Ok(_) => { + bail!("Seller's XMR balance is currently too low to initiate a swap, please try again later") + } + Err(other) => bail!(other), + }, + }; + + tracing::info!(%amount, %fees, "Determined swap amount"); + + api_init.db.insert_peer_id(swap_id, seller_peer_id).await?; + api_init.db.insert_monero_address(swap_id, monero_receive_address) + .await?; + let monero_wallet = api_init.monero_wallet.as_ref().unwrap(); + + let swap = Swap::new( + Arc::clone(&api_init.db), + swap_id, + Arc::clone(&bitcoin_wallet), + Arc::clone(&monero_wallet), + env_config, + event_loop_handle, + monero_receive_address, + bitcoin_change_address, + amount, + ); + + tokio::select! { + result = event_loop => { + result + .context("EventLoop panicked")?; + }, + result = bob::run(swap) => { + result.context("Failed to complete swap")?; + } + } json!({ "empty": "true" }) @@ -110,6 +193,9 @@ impl Request { Some(handle) } }; + loop { + + } json!({ "empty": "true" }) @@ -217,21 +303,21 @@ impl Init { cli::tracing::init(debug, json, data_dir.join("logs"), None)?; let init = Init { - bitcoin_wallet: Some(init_bitcoin_wallet( + bitcoin_wallet: Some(Arc::new(init_bitcoin_wallet( bitcoin_electrum_rpc_url, &seed, data_dir.clone(), env_config, bitcoin_target_block, ) - .await?), + .await?)), - monero_wallet: Some(init_monero_wallet( + monero_wallet: Some(Arc::new(init_monero_wallet( data_dir.clone(), monero_daemon_address, env_config, ) - .await?.0), + .await?.0)), tor_socks5_port: tor_socks5_port, namespace: XmrBtcNamespace::from_is_testnet(is_testnet), db: open_db(data_dir.join("sqlite")).await?, @@ -308,14 +394,14 @@ impl Init { cli::tracing::init(debug, json, data_dir.join("logs"), None)?; let init = Init { - bitcoin_wallet: Some(init_bitcoin_wallet( + bitcoin_wallet: Some(Arc::new(init_bitcoin_wallet( bitcoin_electrum_rpc_url, &seed, data_dir.clone(), env_config, bitcoin_target_block, ) - .await?), + .await?)), monero_wallet: None, tor_socks5_port, namespace: XmrBtcNamespace::from_is_testnet(is_testnet), @@ -328,8 +414,19 @@ impl Init { }; Ok(init) } +} - +impl Serialize for Init { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + // 3 is the number of fields in the struct. + let mut state = serializer.serialize_struct("Init", 3)?; + state.serialize_field("debug", &self.debug)?; + state.serialize_field("json", &self.json)?; + state.end() + } } async fn init_bitcoin_wallet( diff --git a/swap/src/bin/swap.rs b/swap/src/bin/swap.rs index 241258ca..9bddbb43 100644 --- a/swap/src/bin/swap.rs +++ b/swap/src/bin/swap.rs @@ -16,6 +16,7 @@ use anyhow::Result; use std::env; use swap::cli::command::{parse_args_and_apply_defaults, ParseResult}; use swap::common::check_latest_version; +use std::sync::Arc; #[tokio::main] async fn main() -> Result<()> { @@ -30,7 +31,7 @@ async fn main() -> Result<()> { if let Err(e) = check_latest_version(env!("CARGO_PKG_VERSION")).await { eprintln!("{}", e); } - let result = request.call(&api_init).await?; + let result = request.call(Arc::clone(&api_init)).await?; println!("{}", result); Ok(()) } diff --git a/swap/src/rpc.rs b/swap/src/rpc.rs index f3af2416..e92bd7da 100644 --- a/swap/src/rpc.rs +++ b/swap/src/rpc.rs @@ -2,6 +2,7 @@ use std::net::SocketAddr; use jsonrpsee::http_server::{RpcModule, HttpServerBuilder, HttpServerHandle}; use thiserror::Error; use crate::api::{Init}; +use std::sync::Arc; pub mod methods; @@ -11,11 +12,11 @@ pub enum Error { ExampleError, } -pub async fn run_server(server_address: SocketAddr, api_init: &Init) -> anyhow::Result<(SocketAddr, HttpServerHandle)> { +pub async fn run_server(server_address: SocketAddr, api_init: Arc) -> anyhow::Result<(SocketAddr, HttpServerHandle)> { let server = HttpServerBuilder::default().build(server_address).await?; let mut modules = RpcModule::new(()); { - modules.merge(methods::register_modules(&api_init)) + modules.merge(methods::register_modules(Arc::clone(&api_init))) .unwrap() } diff --git a/swap/src/rpc/methods.rs b/swap/src/rpc/methods.rs index f1d848c4..6c734a0a 100644 --- a/swap/src/rpc/methods.rs +++ b/swap/src/rpc/methods.rs @@ -8,25 +8,133 @@ use std::str::FromStr; use crate::cli::command::{DEFAULT_ELECTRUM_RPC_URL_TESTNET, DEFAULT_BITCOIN_CONFIRMATION_TARGET_TESTNET}; use crate::rpc::Error; use crate::{bitcoin, cli, monero}; +use std::sync::Arc; +use serde_json::json; +use uuid::Uuid; +use std::collections::HashMap; +use libp2p::core::Multiaddr; - -pub fn register_modules(api_init: &Init) -> RpcModule<()> { - let mut module = RpcModule::new(()); +pub fn register_modules(context: Arc) -> RpcModule> { + let mut module = RpcModule::new(context); module - .register_async_method("get_bitcoin_balance", |_, _| async { - get_bitcoin_balance().await.map_err(|err| jsonrpsee_core::Error::Custom(err.to_string())) - }) + .register_async_method("get_bitcoin_balance", |_, context| async move { + get_bitcoin_balance(&context).await.map_err(|err| jsonrpsee_core::Error::Custom(err.to_string())) + }, + ) + .unwrap(); + module + .register_async_method("get_history", |_, context| async move { + get_history(&context).await.map_err(|err| jsonrpsee_core::Error::Custom(err.to_string())) + }, + ) + .unwrap(); + module + .register_async_method("resume_swap", |params, context| async move { + let swap_id: HashMap = params.parse()?; + let swap_id = Uuid::from_str(swap_id.get("swap_id").ok_or_else(|| jsonrpsee_core::Error::Custom("Does not contain swap_id".to_string()))?).unwrap(); + resume_swap(swap_id, &context).await.map_err(|err| jsonrpsee_core::Error::Custom(err.to_string())) + }, + ) + .unwrap(); + module + .register_async_method("withdraw_btc", |params, context| async move { + let map_params: HashMap = params.parse()?; + let amount = if let Some(amount_str) = map_params.get("amount") { + Some(::bitcoin::Amount::from_str_in(amount_str, ::bitcoin::Denomination::Bitcoin).map_err(|err| jsonrpsee_core::Error::Custom("Unable to parse amount".to_string()))?) + } else { + None + }; + let withdraw_address = bitcoin::Address::from_str(map_params.get("address").ok_or_else(|| jsonrpsee_core::Error::Custom("Does not contain address".to_string()))?).unwrap(); + withdraw_btc(withdraw_address, amount, &context).await.map_err(|err| jsonrpsee_core::Error::Custom(err.to_string())) + }, + ) + .unwrap(); + module + .register_async_method("buy_xmr", |params, context| async move { + let map_params: HashMap = params.parse()?; + let bitcoin_change_address = bitcoin::Address::from_str(map_params.get("bitcoin_change_address").ok_or_else(|| jsonrpsee_core::Error::Custom("Does not contain bitcoin_change_address".to_string()))?).unwrap(); + let monero_receive_address = monero::Address::from_str(map_params.get("monero_receive_address").ok_or_else(|| jsonrpsee_core::Error::Custom("Does not contain monero_receiveaddress".to_string()))?).unwrap(); + let seller = Multiaddr::from_str(map_params.get("seller").ok_or_else(|| jsonrpsee_core::Error::Custom("Does not contain seller".to_string()))?).unwrap(); + buy_xmr(bitcoin_change_address, monero_receive_address, seller, &context).await.map_err(|err| jsonrpsee_core::Error::Custom(err.to_string())) + }, + ) + .unwrap(); + module + .register_async_method("list_sellers", |params, context| async move { + let map_params: HashMap = params.parse()?; + let rendezvous_point = Multiaddr::from_str(map_params.get("rendezvous_point").ok_or_else(|| jsonrpsee_core::Error::Custom("Does not contain rendezvous_point".to_string()))?).unwrap(); + list_sellers(rendezvous_point, &context).await.map_err(|err| jsonrpsee_core::Error::Custom(err.to_string())) + }, + ) .unwrap(); module - } -async fn get_bitcoin_balance() -> anyhow::Result<(), Error> { +async fn get_bitcoin_balance(context: &Arc>) -> anyhow::Result { let request = Request { params: Params::default(), cmd: Command::Balance, }; - // request.call(api_init).await; - Ok(()) - + let balance = request.call(Arc::clone(context)).await.unwrap(); + Ok(balance) +} + +async fn get_history(context: &Arc>) -> anyhow::Result { + let request = Request { + params: Params::default(), + cmd: Command::History, + }; + let history = request.call(Arc::clone(context)).await.unwrap(); + Ok(history) +} + +async fn resume_swap(swap_id: Uuid, context: &Arc>) -> anyhow::Result { + let request = Request { + params: Params { + swap_id: Some(swap_id), + ..Default::default() + }, + cmd: Command::Resume, + }; + + let result = request.call(Arc::clone(context)).await.unwrap(); + Ok(result) +} +async fn withdraw_btc(withdraw_address: bitcoin::Address, amount: Option, context: &Arc>) -> anyhow::Result { + let request = Request { + params: Params { + amount: amount, + address: Some(withdraw_address), + ..Default::default() + }, + cmd: Command::WithdrawBtc, + }; + let result = request.call(Arc::clone(context)).await.unwrap(); + Ok(result) +} + +async fn buy_xmr(bitcoin_change_address: bitcoin::Address, monero_receive_address: monero::Address, seller: Multiaddr, context: &Arc>) -> anyhow::Result { + let request = Request { + params: Params { + bitcoin_change_address: Some(bitcoin_change_address), + monero_receive_address: Some(monero_receive_address), + seller: Some(seller), + ..Default::default() + }, + cmd: Command::BuyXmr, + }; + let swap = request.call(Arc::clone(context)).await.unwrap(); + Ok(swap) +} + +async fn list_sellers(rendezvous_point: Multiaddr, context: &Arc>) -> anyhow::Result { + let request = Request { + params: Params { + rendezvous_point: Some(rendezvous_point), + ..Default::default() + }, + cmd: Command::ListSellers, + }; + let result = request.call(Arc::clone(context)).await.unwrap(); + Ok(result) }