RPC server for API Interface (#1276)

* saving: implementing internal api shared by cli and rpc server

* writing async rpc methods and using arc for shared struct references

* cleaning up, renamed Init to Context

* saving: cleaning up and initial work for tests

* Respond with bitcoin withdraw txid

* Print RPC server address

* Cleanup, formatting, add `get_seller`, `get_swap_start_date` RPC endpoints

* fixing tests in cli module

* uncommenting and fixing more tests

* split api module and propagate errors with rpc server

* moving methods to api and validating addresses for rpc

* add broadcast channel to handle shutdowns gracefully and prepare for RPC server test

* added files

* Update rpc.rs

* adding new unfinished RPC tests

* updating rpc-server tests

* fixing warnings

* fixing formatting and cargo clippy warnings

* fix missing import in test

* fix: add data_dir to config to make config command work

* set server listen address manually and return file locations in JSON on Config

* Add called api method and swap_id to tracing for context, reduced boilerplate

* Pass server_address properly to RpcServer

* Update Cargo.lock

* dprint fmt

* Add cancel_refund RPC endpoint

* Combine Cmd and Params

* Disallow concurrent swaps

* Use RwLock instead of Mutex to allow for parallel reads and add get_current_swap endpoint

* Return wallet descriptor to RPC API caller

* Append all cli logs to single log file

After careful consideration, I've concluded that it's not practical/possible to ensure that the previous behaviour (one log file per swap) is preserved due to limitations of the tracing-subscriber crate and a big in the built in JSON formatter

* Add get_swap_expired_timelock timelock, other small refactoring

- Add get_swap_expired_timelock endpoint to return expired timelock if one exists. Fails if bitcoin lock tx has not yet published or if swap is already finished.
- Rename current_epoch to expired_timelock to enforce consistent method names
- Add blocks left until current expired timelock expires (next timelock expires) to ExpiredTimelock struct
- Change .expect() to .unwrap() in rpc server method register because those will only fail if we register the same method twice which will never happen

* initiating swaps in a separate task and handling shutdown signals with broadcast queues

* Replace get_swap_start_date, get_seller, get_expired_timelock with one get_swap_info rpc method

* WIP: Struct for concurrent swaps manager

* Ensure correct tracing spans

* Add note regarding Request, Method structs

* Update request.rs

* Add tracing span attribute log_reference_id to logs caused by rpc call

* Sync bitcoin wallet before initial max_giveable call

* use Span::current() to pass down to tracing span to spawned tasks

* Remove unused shutdown channel

* Add `get_monero_recovery_info` RPC endpoint

- Add `get_monero_recovery_info` RPC endpoint
- format PrivateViewKey using Display

* Rename `Method::RawHistory` to `Method::GetRawStates`

* Wait for swap to be suspended after sending signal

* Remove notes

* Add tracing span attribute log_reference_id to logs caused by rpc call

* Sync bitcoin wallet before initial max_giveable call

* use Span::current() to pass down to tracing span to spawned tasks

* Remove unused shutdown channel

* Add `get_monero_recovery_info` RPC endpoint

- Add `get_monero_recovery_info` RPC endpoint
- format PrivateViewKey using Display

* Rename `Method::RawHistory` to `Method::GetRawStates`

* Wait for swap to be suspended after sending signal

* Return additonal info on GetSwapInfo

* Update wallet.rs

* fix compile issues for tests and use serial_test crate

* fix rpc tests, only check for RPC errors and not returned values

* Rename `get_raw_history` tp `get_raw_states`

* Fix typo in rpc server stopped tracing log

* Remove unnecessary success property on suspend_current_swap response

* fixing test_cli_arguments and other tests

* WIP: RPC server integration tests

* WIP: Integration tests for RPC server

* Update rpc tests

* fix compile and warnings in tests/rpc.rs

* test: fix assert

* clippy --fix

* remove otp file

* cargo clippy fixes

* move resume swap initialization code out of spawned task

* Use `in_current_span` to pass down tracing span to spawned tasks

* moving buy_xmr initialization code out of spawned tasks

* cargo fmt

* Moving swap initialization code inside tokio select block to handle swap lock release logic

* Remove unnecessary swap suspension listener from determine_btc_to_swap call in BuyXmr

* Spawn event loop before requesting quote

* Release swap lock after receiving shutdown signal

* Remove inner tokio::select in BuyXmr and Resume

* Improve debug text for swap resume

* Return error to API caller if bid quote request fails

* Print error if one occurs during process invoked by API call

* Return bid quote to API caller

* Use type safe query! macro for database retrieval of states

* Return tx_lock_fee to API caller on GetSwapInfo call

Update request.rs

* Allow API caller to retrieve last synced bitcoin balane and avoid costly sync

* Return restore height on MoneroRecovery command to API Caller

* Include entire error cause-chain in API response

* Add span to bitcoin wallet logs

* Log event loop connection properties as tracing fields

* Wait for background tasks to complete before exiting CLI

* clippy

* specify sqlx patch version explicitly

* remove mem::forget and replace with _guard

* ci: add rpc test job

* test: wrap rpc test in #[cfg(test)]

* add missing tokio::test attribute

* fix and merge rpc tests, parse uuuid and multiaddr from serde_json value

* default Tor socks port to 9050, Cargo fmt

* Update swap/sqlite_dev_setup.sh: add version

Co-authored-by: Byron Hambly <byron@hambly.dev>

* ci: free up space on ubuntu test job

* Update swap/src/bitcoin/wallet.rs

Co-authored-by: Byron Hambly <byron@hambly.dev>

* Update swap/src/bitcoin/wallet.rs

Co-authored-by: Byron Hambly <byron@hambly.dev>

* fmt

---------

Co-authored-by: binarybaron <86064887+binarybaron@users.noreply.github.com>
Co-authored-by: Byron Hambly <byron@hambly.dev>
This commit is contained in:
yamabiiko 2024-05-22 16:12:58 +03:00 committed by GitHub
parent 7d4a6c90f3
commit 5ff46be279
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
29 changed files with 3471 additions and 1505 deletions

View File

@ -175,6 +175,17 @@ jobs:
- name: Run test ${{ matrix.test_name }}
run: cargo test --package swap --all-features --test ${{ matrix.test_name }} -- --nocapture
rpc_tests:
runs-on: ubuntu-latest
steps:
- name: Checkout sources
uses: actions/checkout@v4.1.1
- uses: Swatinem/rust-cache@v2.7.1
- name: Run RPC server tests
run: cargo test --package swap --all-features --test rpc -- --nocapture
check_stable:
runs-on: ubuntu-latest
steps:

230
Cargo.lock generated
View File

@ -143,6 +143,15 @@ dependencies = [
"tokio",
]
[[package]]
name = "async-lock"
version = "2.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa24f727524730b077666307f2734b4a1a1c57acb79193127dcc8914d5242dd7"
dependencies = [
"event-listener",
]
[[package]]
name = "async-trait"
version = "0.1.80"
@ -308,6 +317,15 @@ version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d86b93f97252c47b41663388e6d155714a9d0c398b99f1005cbc5f978b29f445"
[[package]]
name = "beef"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a8241f3ebb85c056b509d4327ad0358fbbba6ffb340bf388f26350aeda225b1"
dependencies = [
"serde",
]
[[package]]
name = "big-bytes"
version = "1.0.0"
@ -519,6 +537,16 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "771fe0050b883fcc3ea2359b1a96bcfbc090b7116eae7c3c512c7a083fdf23d3"
[[package]]
name = "bstr"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6798148dccfbff0fae41c7574d2fa8f1ef3492fba0face179de5d8d447d67b05"
dependencies = [
"memchr",
"serde",
]
[[package]]
name = "bumpalo"
version = "3.6.1"
@ -1479,6 +1507,19 @@ dependencies = [
"url",
]
[[package]]
name = "globset"
version = "0.4.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "029d74589adefde59de1a0c4f4732695c32805624aec7b68d91503d4dba79afc"
dependencies = [
"aho-corasick",
"bstr",
"fnv",
"log",
"regex",
]
[[package]]
name = "h2"
version = "0.3.18"
@ -1566,9 +1607,9 @@ dependencies = [
[[package]]
name = "hermit-abi"
version = "0.3.1"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fed44880c466736ef9a5c5b5facefb5ed0785676d0c02d612db14e54f0d84286"
checksum = "379dada1584ad501b383485dd706b8afb7a70fcbc7f4da7d780638a5a6124a60"
[[package]]
name = "hex"
@ -1818,13 +1859,13 @@ checksum = "47be2f14c678be2fdcab04ab1171db51b2762ce6f0a8ee87c8dd4a04ed216135"
[[package]]
name = "is-terminal"
version = "0.4.9"
version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b"
checksum = "f23ff5ef2b80d608d61efee834934d862cd92461afc0560dedf493e4c033738b"
dependencies = [
"hermit-abi 0.3.1",
"rustix",
"windows-sys 0.48.0",
"hermit-abi 0.3.8",
"libc",
"windows-sys 0.52.0",
]
[[package]]
@ -1893,6 +1934,115 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "jsonrpsee"
version = "0.16.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d291e3a5818a2384645fd9756362e6d89cf0541b0b916fa7702ea4a9833608e"
dependencies = [
"jsonrpsee-core",
"jsonrpsee-server",
"jsonrpsee-types",
"jsonrpsee-ws-client",
]
[[package]]
name = "jsonrpsee-client-transport"
version = "0.16.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "965de52763f2004bc91ac5bcec504192440f0b568a5d621c59d9dbd6f886c3fb"
dependencies = [
"futures-util",
"http 0.2.11",
"jsonrpsee-core",
"jsonrpsee-types",
"pin-project 1.0.5",
"rustls-native-certs 0.6.3",
"soketto",
"thiserror",
"tokio",
"tokio-rustls 0.23.1",
"tokio-util",
"tracing",
"webpki-roots 0.22.2",
]
[[package]]
name = "jsonrpsee-core"
version = "0.16.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4e70b4439a751a5de7dd5ed55eacff78ebf4ffe0fc009cb1ebb11417f5b536b"
dependencies = [
"anyhow",
"arrayvec",
"async-lock",
"async-trait",
"beef",
"futures-channel",
"futures-timer",
"futures-util",
"globset",
"hyper 0.14.28",
"jsonrpsee-types",
"parking_lot 0.12.0",
"rand 0.8.3",
"rustc-hash",
"serde",
"serde_json",
"soketto",
"thiserror",
"tokio",
"tracing",
]
[[package]]
name = "jsonrpsee-server"
version = "0.16.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fb69dad85df79527c019659a992498d03f8495390496da2f07e6c24c2b356fc"
dependencies = [
"futures-channel",
"futures-util",
"http 0.2.11",
"hyper 0.14.28",
"jsonrpsee-core",
"jsonrpsee-types",
"serde",
"serde_json",
"soketto",
"tokio",
"tokio-stream",
"tokio-util",
"tower",
"tracing",
]
[[package]]
name = "jsonrpsee-types"
version = "0.16.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5bd522fe1ce3702fd94812965d7bb7a3364b1c9aba743944c5a00529aae80f8c"
dependencies = [
"anyhow",
"beef",
"serde",
"serde_json",
"thiserror",
"tracing",
]
[[package]]
name = "jsonrpsee-ws-client"
version = "0.16.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b83daeecfc6517cfe210df24e570fb06213533dfb990318fae781f4c7119dd9"
dependencies = [
"http 0.2.11",
"jsonrpsee-client-transport",
"jsonrpsee-core",
"jsonrpsee-types",
]
[[package]]
name = "keccak"
version = "0.1.0"
@ -3470,6 +3620,12 @@ version = "0.1.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
[[package]]
name = "rustc-hash"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
[[package]]
name = "rustc-hex"
version = "2.1.0"
@ -3553,6 +3709,18 @@ dependencies = [
"security-framework",
]
[[package]]
name = "rustls-native-certs"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00"
dependencies = [
"openssl-probe",
"rustls-pemfile",
"schannel",
"security-framework",
]
[[package]]
name = "rustls-pemfile"
version = "1.0.0"
@ -3723,6 +3891,21 @@ dependencies = [
"pest",
]
[[package]]
name = "sequential-macro"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb5facc5f409a55d25bf271c853402a00e1187097d326757043f5dd711944d07"
[[package]]
name = "sequential-test"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0d9c0d773bc7e7733264f460e5dfa00b2510421ddd6284db0749eef8dfb79e9"
dependencies = [
"sequential-macro",
]
[[package]]
name = "serde"
version = "1.0.202"
@ -3936,9 +4119,9 @@ checksum = "0f0242b8e50dd9accdd56170e94ca1ebd223b098eb9c83539a6e367d0f36ae68"
[[package]]
name = "similar"
version = "2.2.1"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "420acb44afdae038210c99e69aae24109f32f15500aa708e81d46c9f29d55fcf"
checksum = "2aeaf503862c419d66959f5d7ca015337d864e9c49485d771b732e2a20453597"
[[package]]
name = "slab"
@ -4019,14 +4202,15 @@ dependencies = [
[[package]]
name = "soketto"
version = "0.7.0"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "083624472e8817d44d02c0e55df043737ff11f279af924abdf93845717c2b75c"
checksum = "41d1c5305e39e09653383c2c7244f2f78b3bcae37cf50c64cb4789c9f5096ec2"
dependencies = [
"base64 0.13.1",
"bytes",
"flate2",
"futures",
"http 0.2.11",
"httparse",
"log",
"rand 0.8.3",
@ -4280,6 +4464,8 @@ dependencies = [
"hex",
"hyper 1.3.1",
"itertools 0.13.0",
"jsonrpsee",
"jsonrpsee-core",
"libp2p",
"mockito",
"monero",
@ -4294,6 +4480,7 @@ dependencies = [
"reqwest",
"rust_decimal",
"rust_decimal_macros",
"sequential-test",
"serde",
"serde_cbor",
"serde_json",
@ -4657,6 +4844,7 @@ checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1"
dependencies = [
"bytes",
"futures-core",
"futures-io",
"futures-sink",
"pin-project-lite 0.2.13",
"tokio",
@ -4723,6 +4911,23 @@ dependencies = [
"tokio",
]
[[package]]
name = "tower"
version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
dependencies = [
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tower-layer"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0"
[[package]]
name = "tower-service"
version = "0.3.1"
@ -4735,6 +4940,7 @@ version = "0.1.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
dependencies = [
"log",
"pin-project-lite 0.2.13",
"tracing-attributes",
"tracing-core",
@ -4921,7 +5127,7 @@ dependencies = [
"log",
"rand 0.8.3",
"rustls 0.19.0",
"rustls-native-certs",
"rustls-native-certs 0.5.0",
"sha-1",
"thiserror",
"url",

View File

@ -32,6 +32,8 @@ ed25519-dalek = "1"
futures = { version = "0.3", default-features = false }
hex = "0.4"
itertools = "0.13"
jsonrpsee = { version = "0.16.2", features = [ "server" ] }
jsonrpsee-core = "0.16.2"
libp2p = { version = "0.42.2", default-features = false, features = [ "tcp-tokio", "yamux", "mplex", "dns-tokio", "noise", "request-response", "websocket", "ping", "rendezvous", "identify" ] }
monero = { version = "0.12", features = [ "serde_support" ] }
monero-rpc = { path = "../monero-rpc" }
@ -49,12 +51,12 @@ serde_json = "1"
serde_with = { version = "1", features = [ "macros" ] }
sha2 = "0.10"
sigma_fun = { git = "https://github.com/LLFourn/secp256kfun", default-features = false, features = [ "ed25519", "serde", "secp256k1", "alloc" ] }
sqlx = { version = "0.6", features = [ "sqlite", "runtime-tokio-rustls", "offline" ] }
sqlx = { version = "0.6.3", features = [ "sqlite", "runtime-tokio-rustls", "offline" ] }
structopt = "0.3"
strum = { version = "0.26", features = [ "derive" ] }
thiserror = "1"
time = "0.3"
tokio = { version = "1", features = [ "rt-multi-thread", "time", "macros", "sync", "process", "fs", "net" ] }
tokio = { version = "1", features = [ "rt-multi-thread", "time", "macros", "sync", "process", "fs", "net", "parking_lot" ] }
tokio-socks = "0.5"
tokio-tungstenite = { version = "0.15", features = [ "rustls-tls" ] }
tokio-util = { version = "0.7", features = [ "io", "codec" ] }
@ -78,10 +80,12 @@ zip = "0.5"
bitcoin-harness = "0.2.2"
get-port = "3"
hyper = "1.3"
jsonrpsee = { version = "0.16.2", features = [ "ws-client" ] }
mockito = "1.3.0"
monero-harness = { path = "../monero-harness" }
port_check = "0.2"
proptest = "1"
sequential-test = "0.2.4"
serde_cbor = "0.11"
serial_test = "3.0"
spectral = "0.6"

View File

@ -1,7 +1,8 @@
#!/bin/bash
# run this script from the swap dir
# make sure you have sqlx-cli installed: cargo install sqlx-cli
# make sure you have sqlx-cli installed: cargo install --version 0.6.3 sqlx-cli
# it's advised for the sqlx-cli to be the same version as specified in cargo.toml
# this script creates a temporary sqlite database
# then runs the migration scripts to create the tables (migrations folder)

View File

@ -28,6 +28,24 @@
},
"query": "\n insert into peer_addresses (\n peer_id,\n address\n ) values (?, ?);\n "
},
"0d465a17ebbb5761421def759c73cad023c30705d5b41a1399ef79d8d2571d7c": {
"describe": {
"columns": [
{
"name": "start_date",
"ordinal": 0,
"type_info": "Text"
}
],
"nullable": [
true
],
"parameters": {
"Right": 1
}
},
"query": "\n SELECT min(entered_at) as start_date\n FROM swap_states\n WHERE swap_id = ?\n "
},
"1ec38c85e7679b2eb42b3df75d9098772ce44fdb8db3012d3c2410d828b74157": {
"describe": {
"columns": [
@ -62,6 +80,30 @@
},
"query": "\n insert into peers (\n swap_id,\n peer_id\n ) values (?, ?);\n "
},
"3f2bfdd2d134586ccad22171cd85a465800fc5c4fdaf191d206974e530240c87": {
"describe": {
"columns": [
{
"name": "swap_id",
"ordinal": 0,
"type_info": "Text"
},
{
"name": "state",
"ordinal": 1,
"type_info": "Text"
}
],
"nullable": [
false,
false
],
"parameters": {
"Right": 0
}
},
"query": "\n SELECT swap_id, state\n FROM swap_states\n "
},
"50a5764546f69c118fa0b64120da50f51073d36257d49768de99ff863e3511e0": {
"describe": {
"columns": [],
@ -90,24 +132,6 @@
},
"query": "\n SELECT state\n FROM swap_states\n WHERE swap_id = ?\n ORDER BY id desc\n LIMIT 1;\n\n "
},
"a0eb85d04ee3842c52291dad4d225941d1141af735922fcbc665868997fce304": {
"describe": {
"columns": [
{
"name": "address",
"ordinal": 0,
"type_info": "Text"
}
],
"nullable": [
false
],
"parameters": {
"Right": 1
}
},
"query": "\n SELECT address\n FROM peer_addresses\n WHERE peer_id = ?\n "
},
"b703032b4ddc627a1124817477e7a8e5014bdc694c36a14053ef3bb2fc0c69b0": {
"describe": {
"columns": [],
@ -135,5 +159,41 @@
}
},
"query": "\n SELECT address\n FROM monero_addresses\n WHERE swap_id = ?\n "
},
"d78acba5eb8563826dd190e0886aa665aae3c6f1e312ee444e65df1c95afe8b2": {
"describe": {
"columns": [
{
"name": "address",
"ordinal": 0,
"type_info": "Text"
}
],
"nullable": [
false
],
"parameters": {
"Right": 1
}
},
"query": "\n SELECT DISTINCT address\n FROM peer_addresses\n WHERE peer_id = ?\n "
},
"e05620f420f8c1022971eeb66a803323a8cf258cbebb2834e3f7cf8f812fa646": {
"describe": {
"columns": [
{
"name": "state",
"ordinal": 0,
"type_info": "Text"
}
],
"nullable": [
false
],
"parameters": {
"Right": 1
}
},
"query": "\n SELECT state\n FROM swap_states\n WHERE swap_id = ?\n "
}
}

460
swap/src/api.rs Normal file
View File

@ -0,0 +1,460 @@
pub mod request;
use crate::cli::command::{Bitcoin, Monero, Tor};
use crate::database::open_db;
use crate::env::{Config as EnvConfig, GetConfig, Mainnet, Testnet};
use crate::fs::system_data_dir;
use crate::network::rendezvous::XmrBtcNamespace;
use crate::protocol::Database;
use crate::seed::Seed;
use crate::{bitcoin, cli, monero};
use anyhow::{bail, Context as AnyContext, Error, Result};
use futures::future::try_join_all;
use std::fmt;
use std::future::Future;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::{Arc, Once};
use tokio::sync::{broadcast, broadcast::Sender, Mutex, RwLock};
use tokio::task::JoinHandle;
use url::Url;
static START: Once = Once::new();
#[derive(Clone, PartialEq, Debug)]
pub struct Config {
tor_socks5_port: u16,
namespace: XmrBtcNamespace,
server_address: Option<SocketAddr>,
pub env_config: EnvConfig,
seed: Option<Seed>,
debug: bool,
json: bool,
data_dir: PathBuf,
is_testnet: bool,
}
use uuid::Uuid;
#[derive(Default)]
pub struct PendingTaskList(Mutex<Vec<JoinHandle<()>>>);
impl PendingTaskList {
pub async fn spawn<F, T>(&self, future: F)
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
let handle = tokio::spawn(async move {
let _ = future.await;
});
self.0.lock().await.push(handle);
}
pub async fn wait_for_tasks(&self) -> Result<()> {
let tasks = {
// Scope for the lock, to avoid holding it for the entire duration of the async block
let mut guard = self.0.lock().await;
guard.drain(..).collect::<Vec<_>>()
};
try_join_all(tasks).await?;
Ok(())
}
}
pub struct SwapLock {
current_swap: RwLock<Option<Uuid>>,
suspension_trigger: Sender<()>,
}
impl SwapLock {
pub fn new() -> Self {
let (suspension_trigger, _) = broadcast::channel(10);
SwapLock {
current_swap: RwLock::new(None),
suspension_trigger,
}
}
pub async fn listen_for_swap_force_suspension(&self) -> Result<(), Error> {
let mut listener = self.suspension_trigger.subscribe();
let event = listener.recv().await;
match event {
Ok(_) => Ok(()),
Err(e) => {
tracing::error!("Error receiving swap suspension signal: {}", e);
bail!(e)
}
}
}
pub async fn acquire_swap_lock(&self, swap_id: Uuid) -> Result<(), Error> {
let mut current_swap = self.current_swap.write().await;
if current_swap.is_some() {
bail!("There already exists an active swap lock");
}
tracing::debug!(swap_id = %swap_id, "Acquiring swap lock");
*current_swap = Some(swap_id);
Ok(())
}
pub async fn get_current_swap_id(&self) -> Option<Uuid> {
*self.current_swap.read().await
}
/// Sends a signal to suspend all ongoing swap processes.
///
/// This function performs the following steps:
/// 1. Triggers the suspension by sending a unit `()` signal to all listeners via `self.suspension_trigger`.
/// 2. Polls the `current_swap` state every 50 milliseconds to check if it has been set to `None`, indicating that the swap processes have been suspended and the lock released.
/// 3. If the lock is not released within 10 seconds, the function returns an error.
///
/// If we send a suspend signal while no swap is in progress, the function will not fail, but will return immediately.
///
/// # Returns
/// - `Ok(())` if the swap lock is successfully released.
/// - `Err(Error)` if the function times out waiting for the swap lock to be released.
///
/// # Notes
/// The 50ms polling interval is considered negligible overhead compared to the typical time required to suspend ongoing swap processes.
pub async fn send_suspend_signal(&self) -> Result<(), Error> {
const TIMEOUT: u64 = 10_000;
const INTERVAL: u64 = 50;
let _ = self.suspension_trigger.send(())?;
for _ in 0..(TIMEOUT / INTERVAL) {
if self.get_current_swap_id().await.is_none() {
return Ok(());
}
tokio::time::sleep(tokio::time::Duration::from_millis(INTERVAL)).await;
}
bail!("Timed out waiting for swap lock to be released");
}
pub async fn release_swap_lock(&self) -> Result<Uuid, Error> {
let mut current_swap = self.current_swap.write().await;
if let Some(swap_id) = current_swap.as_ref() {
tracing::debug!(swap_id = %swap_id, "Releasing swap lock");
let prev_swap_id = *swap_id;
*current_swap = None;
drop(current_swap);
Ok(prev_swap_id)
} else {
bail!("There is no current swap lock to release");
}
}
}
impl Default for SwapLock {
fn default() -> Self {
Self::new()
}
}
// workaround for warning over monero_rpc_process which we must own but not read
#[allow(dead_code)]
pub struct Context {
pub db: Arc<dyn Database + Send + Sync>,
bitcoin_wallet: Option<Arc<bitcoin::Wallet>>,
monero_wallet: Option<Arc<monero::Wallet>>,
monero_rpc_process: Option<monero::WalletRpcProcess>,
pub swap_lock: Arc<SwapLock>,
pub config: Config,
pub tasks: Arc<PendingTaskList>,
}
#[allow(clippy::too_many_arguments)]
impl Context {
pub async fn build(
bitcoin: Option<Bitcoin>,
monero: Option<Monero>,
tor: Option<Tor>,
data: Option<PathBuf>,
is_testnet: bool,
debug: bool,
json: bool,
server_address: Option<SocketAddr>,
) -> Result<Context> {
let data_dir = data::data_dir_from(data, is_testnet)?;
let env_config = env_config_from(is_testnet);
let seed = Seed::from_file_or_generate(data_dir.as_path())
.context("Failed to read seed in file")?;
let bitcoin_wallet = {
if let Some(bitcoin) = bitcoin {
let (bitcoin_electrum_rpc_url, bitcoin_target_block) =
bitcoin.apply_defaults(is_testnet)?;
Some(Arc::new(
init_bitcoin_wallet(
bitcoin_electrum_rpc_url,
&seed,
data_dir.clone(),
env_config,
bitcoin_target_block,
)
.await?,
))
} else {
None
}
};
let (monero_wallet, monero_rpc_process) = {
if let Some(monero) = monero {
let monero_daemon_address = monero.apply_defaults(is_testnet);
let (wlt, prc) =
init_monero_wallet(data_dir.clone(), monero_daemon_address, env_config).await?;
(Some(Arc::new(wlt)), Some(prc))
} else {
(None, None)
}
};
let tor_socks5_port = tor.map_or(9050, |tor| tor.tor_socks5_port);
START.call_once(|| {
let _ = cli::tracing::init(debug, json, data_dir.join("logs"));
});
let context = Context {
db: open_db(data_dir.join("sqlite")).await?,
bitcoin_wallet,
monero_wallet,
monero_rpc_process,
config: Config {
tor_socks5_port,
namespace: XmrBtcNamespace::from_is_testnet(is_testnet),
env_config,
seed: Some(seed),
server_address,
debug,
json,
is_testnet,
data_dir,
},
swap_lock: Arc::new(SwapLock::new()),
tasks: Arc::new(PendingTaskList::default()),
};
Ok(context)
}
pub async fn for_harness(
seed: Seed,
env_config: EnvConfig,
db_path: PathBuf,
bob_bitcoin_wallet: Arc<bitcoin::Wallet>,
bob_monero_wallet: Arc<monero::Wallet>,
) -> Self {
let config = Config::for_harness(seed, env_config);
Self {
bitcoin_wallet: Some(bob_bitcoin_wallet),
monero_wallet: Some(bob_monero_wallet),
config,
db: open_db(db_path)
.await
.expect("Could not open sqlite database"),
monero_rpc_process: None,
swap_lock: Arc::new(SwapLock::new()),
tasks: Arc::new(PendingTaskList::default()),
}
}
}
impl fmt::Debug for Context {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "")
}
}
async fn init_bitcoin_wallet(
electrum_rpc_url: Url,
seed: &Seed,
data_dir: PathBuf,
env_config: EnvConfig,
bitcoin_target_block: usize,
) -> Result<bitcoin::Wallet> {
let wallet_dir = data_dir.join("wallet");
let wallet = bitcoin::Wallet::new(
electrum_rpc_url.clone(),
&wallet_dir,
seed.derive_extended_private_key(env_config.bitcoin_network)?,
env_config,
bitcoin_target_block,
)
.await
.context("Failed to initialize Bitcoin wallet")?;
wallet.sync().await?;
Ok(wallet)
}
async fn init_monero_wallet(
data_dir: PathBuf,
monero_daemon_address: String,
env_config: EnvConfig,
) -> Result<(monero::Wallet, monero::WalletRpcProcess)> {
let network = env_config.monero_network;
const MONERO_BLOCKCHAIN_MONITORING_WALLET_NAME: &str = "swap-tool-blockchain-monitoring-wallet";
let monero_wallet_rpc = monero::WalletRpc::new(data_dir.join("monero")).await?;
let monero_wallet_rpc_process = monero_wallet_rpc
.run(network, Some(monero_daemon_address))
.await?;
let monero_wallet = monero::Wallet::open_or_create(
monero_wallet_rpc_process.endpoint(),
MONERO_BLOCKCHAIN_MONITORING_WALLET_NAME.to_string(),
env_config,
)
.await?;
Ok((monero_wallet, monero_wallet_rpc_process))
}
mod data {
use super::*;
pub fn data_dir_from(arg_dir: Option<PathBuf>, testnet: bool) -> Result<PathBuf> {
let base_dir = match arg_dir {
Some(custom_base_dir) => custom_base_dir,
None => os_default()?,
};
let sub_directory = if testnet { "testnet" } else { "mainnet" };
Ok(base_dir.join(sub_directory))
}
fn os_default() -> Result<PathBuf> {
Ok(system_data_dir()?.join("cli"))
}
}
fn env_config_from(testnet: bool) -> EnvConfig {
if testnet {
Testnet::get_config()
} else {
Mainnet::get_config()
}
}
impl Config {
pub fn for_harness(seed: Seed, env_config: EnvConfig) -> Self {
let data_dir = data::data_dir_from(None, false).expect("Could not find data directory");
Self {
tor_socks5_port: 9050,
namespace: XmrBtcNamespace::from_is_testnet(false),
server_address: None,
env_config,
seed: Some(seed),
debug: false,
json: false,
is_testnet: false,
data_dir,
}
}
}
#[cfg(test)]
pub mod api_test {
use super::*;
use crate::api::request::{Method, Request};
use libp2p::Multiaddr;
use std::str::FromStr;
use uuid::Uuid;
pub const MULTI_ADDRESS: &str =
"/ip4/127.0.0.1/tcp/9939/p2p/12D3KooWCdMKjesXMJz1SiZ7HgotrxuqhQJbP5sgBm2BwP1cqThi";
pub const MONERO_STAGENET_ADDRESS: &str = "53gEuGZUhP9JMEBZoGaFNzhwEgiG7hwQdMCqFxiyiTeFPmkbt1mAoNybEUvYBKHcnrSgxnVWgZsTvRBaHBNXPa8tHiCU51a";
pub const BITCOIN_TESTNET_ADDRESS: &str = "tb1qr3em6k3gfnyl8r7q0v7t4tlnyxzgxma3lressv";
pub const MONERO_MAINNET_ADDRESS: &str = "44Ato7HveWidJYUAVw5QffEcEtSH1DwzSP3FPPkHxNAS4LX9CqgucphTisH978FLHE34YNEx7FcbBfQLQUU8m3NUC4VqsRa";
pub const BITCOIN_MAINNET_ADDRESS: &str = "bc1qe4epnfklcaa0mun26yz5g8k24em5u9f92hy325";
pub const SWAP_ID: &str = "ea030832-3be9-454f-bb98-5ea9a788406b";
impl Config {
pub fn default(
is_testnet: bool,
data_dir: Option<PathBuf>,
debug: bool,
json: bool,
) -> Self {
let data_dir = data::data_dir_from(data_dir, is_testnet).unwrap();
let seed = Seed::from_file_or_generate(data_dir.as_path()).unwrap();
let env_config = env_config_from(is_testnet);
Self {
tor_socks5_port: 9050,
namespace: XmrBtcNamespace::from_is_testnet(is_testnet),
server_address: None,
env_config,
seed: Some(seed),
debug,
json,
is_testnet,
data_dir,
}
}
}
impl Request {
pub fn buy_xmr(is_testnet: bool) -> Request {
let seller = Multiaddr::from_str(MULTI_ADDRESS).unwrap();
let bitcoin_change_address = {
if is_testnet {
bitcoin::Address::from_str(BITCOIN_TESTNET_ADDRESS).unwrap()
} else {
bitcoin::Address::from_str(BITCOIN_MAINNET_ADDRESS).unwrap()
}
};
let monero_receive_address = {
if is_testnet {
monero::Address::from_str(MONERO_STAGENET_ADDRESS).unwrap()
} else {
monero::Address::from_str(MONERO_MAINNET_ADDRESS).unwrap()
}
};
Request::new(Method::BuyXmr {
seller,
bitcoin_change_address,
monero_receive_address,
swap_id: Uuid::new_v4(),
})
}
pub fn resume() -> Request {
Request::new(Method::Resume {
swap_id: Uuid::from_str(SWAP_ID).unwrap(),
})
}
pub fn cancel() -> Request {
Request::new(Method::CancelAndRefund {
swap_id: Uuid::from_str(SWAP_ID).unwrap(),
})
}
pub fn refund() -> Request {
Request::new(Method::CancelAndRefund {
swap_id: Uuid::from_str(SWAP_ID).unwrap(),
})
}
}
}

930
swap/src/api/request.rs Normal file
View File

@ -0,0 +1,930 @@
use crate::api::Context;
use crate::bitcoin::{Amount, ExpiredTimelocks, TxLock};
use crate::cli::{list_sellers, EventLoop, SellerStatus};
use crate::libp2p_ext::MultiAddrExt;
use crate::network::quote::{BidQuote, ZeroQuoteReceived};
use crate::network::swarm;
use crate::protocol::bob::{BobState, Swap};
use crate::protocol::{bob, State};
use crate::{bitcoin, cli, monero, rpc};
use anyhow::{bail, Context as AnyContext, Result};
use libp2p::core::Multiaddr;
use qrcode::render::unicode;
use qrcode::QrCode;
use serde_json::json;
use std::cmp::min;
use std::convert::TryInto;
use std::future::Future;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tracing::{debug_span, field, Instrument, Span};
use uuid::Uuid;
#[derive(PartialEq, Debug)]
pub struct Request {
pub cmd: Method,
pub log_reference: Option<String>,
}
#[derive(Debug, PartialEq)]
pub enum Method {
BuyXmr {
seller: Multiaddr,
bitcoin_change_address: bitcoin::Address,
monero_receive_address: monero::Address,
swap_id: Uuid,
},
Resume {
swap_id: Uuid,
},
CancelAndRefund {
swap_id: Uuid,
},
MoneroRecovery {
swap_id: Uuid,
},
History,
Config,
WithdrawBtc {
amount: Option<Amount>,
address: bitcoin::Address,
},
Balance {
force_refresh: bool,
},
ListSellers {
rendezvous_point: Multiaddr,
},
ExportBitcoinWallet,
SuspendCurrentSwap,
StartDaemon {
server_address: Option<SocketAddr>,
},
GetCurrentSwap,
GetSwapInfo {
swap_id: Uuid,
},
GetRawStates,
}
impl Method {
fn get_tracing_span(&self, log_reference_id: Option<String>) -> Span {
let span = match self {
Method::Balance { .. } => {
debug_span!(
"method",
method_name = "Balance",
log_reference_id = field::Empty
)
}
Method::BuyXmr { swap_id, .. } => {
debug_span!("method", method_name="BuyXmr", swap_id=%swap_id, log_reference_id=field::Empty)
}
Method::CancelAndRefund { swap_id } => {
debug_span!("method", method_name="CancelAndRefund", swap_id=%swap_id, log_reference_id=field::Empty)
}
Method::Resume { swap_id } => {
debug_span!("method", method_name="Resume", swap_id=%swap_id, log_reference_id=field::Empty)
}
Method::Config => {
debug_span!(
"method",
method_name = "Config",
log_reference_id = field::Empty
)
}
Method::ExportBitcoinWallet => {
debug_span!(
"method",
method_name = "ExportBitcoinWallet",
log_reference_id = field::Empty
)
}
Method::GetCurrentSwap => {
debug_span!(
"method",
method_name = "GetCurrentSwap",
log_reference_id = field::Empty
)
}
Method::GetSwapInfo { .. } => {
debug_span!(
"method",
method_name = "GetSwapInfo",
log_reference_id = field::Empty
)
}
Method::History => {
debug_span!(
"method",
method_name = "History",
log_reference_id = field::Empty
)
}
Method::ListSellers { .. } => {
debug_span!(
"method",
method_name = "ListSellers",
log_reference_id = field::Empty
)
}
Method::MoneroRecovery { .. } => {
debug_span!(
"method",
method_name = "MoneroRecovery",
log_reference_id = field::Empty
)
}
Method::GetRawStates => debug_span!(
"method",
method_name = "RawHistory",
log_reference_id = field::Empty
),
Method::StartDaemon { .. } => {
debug_span!(
"method",
method_name = "StartDaemon",
log_reference_id = field::Empty
)
}
Method::SuspendCurrentSwap => {
debug_span!(
"method",
method_name = "SuspendCurrentSwap",
log_reference_id = field::Empty
)
}
Method::WithdrawBtc { .. } => {
debug_span!(
"method",
method_name = "WithdrawBtc",
log_reference_id = field::Empty
)
}
};
if let Some(log_reference_id) = log_reference_id {
span.record("log_reference_id", log_reference_id.as_str());
}
span
}
}
impl Request {
pub fn new(cmd: Method) -> Request {
Request {
cmd,
log_reference: None,
}
}
pub fn with_id(cmd: Method, id: Option<String>) -> Request {
Request {
cmd,
log_reference: id,
}
}
async fn handle_cmd(self, context: Arc<Context>) -> Result<serde_json::Value> {
match self.cmd {
Method::SuspendCurrentSwap => {
let swap_id = context.swap_lock.get_current_swap_id().await;
if let Some(id_value) = swap_id {
context.swap_lock.send_suspend_signal().await?;
Ok(json!({ "swapId": id_value }))
} else {
bail!("No swap is currently running")
}
}
Method::GetSwapInfo { swap_id } => {
let bitcoin_wallet = context
.bitcoin_wallet
.as_ref()
.context("Could not get Bitcoin wallet")?;
let state = context.db.get_state(swap_id).await?;
let is_completed = state.swap_finished();
let peerId = context
.db
.get_peer_id(swap_id)
.await
.with_context(|| "Could not get PeerID")?;
let addresses = context
.db
.get_addresses(peerId)
.await
.with_context(|| "Could not get addressess")?;
let start_date = context.db.get_swap_start_date(swap_id).await?;
let swap_state: BobState = state.try_into()?;
let state_name = format!("{}", swap_state);
let (
xmr_amount,
btc_amount,
tx_lock_id,
tx_cancel_fee,
tx_refund_fee,
tx_lock_fee,
btc_refund_address,
cancel_timelock,
punish_timelock,
) = context
.db
.get_states(swap_id)
.await?
.iter()
.find_map(|state| {
if let State::Bob(BobState::SwapSetupCompleted(state2)) = state {
let xmr_amount = state2.xmr;
let btc_amount = state2.tx_lock.lock_amount().to_sat();
let tx_cancel_fee = state2.tx_cancel_fee.to_sat();
let tx_refund_fee = state2.tx_refund_fee.to_sat();
let tx_lock_id = state2.tx_lock.txid();
let btc_refund_address = state2.refund_address.to_string();
if let Ok(tx_lock_fee) = state2.tx_lock.fee() {
let tx_lock_fee = tx_lock_fee.to_sat();
Some((
xmr_amount,
btc_amount,
tx_lock_id,
tx_cancel_fee,
tx_refund_fee,
tx_lock_fee,
btc_refund_address,
state2.cancel_timelock,
state2.punish_timelock,
))
} else {
None
}
} else {
None
}
})
.with_context(|| "Did not find SwapSetupCompleted state for swap")?;
let timelock = match swap_state {
BobState::Started { .. }
| BobState::SafelyAborted
| BobState::SwapSetupCompleted(_) => None,
BobState::BtcLocked { state3: state, .. }
| BobState::XmrLockProofReceived { state, .. } => {
Some(state.expired_timelock(bitcoin_wallet).await)
}
BobState::XmrLocked(state) | BobState::EncSigSent(state) => {
Some(state.expired_timelock(bitcoin_wallet).await)
}
BobState::CancelTimelockExpired(state) | BobState::BtcCancelled(state) => {
Some(state.expired_timelock(bitcoin_wallet).await)
}
BobState::BtcPunished { .. } => Some(Ok(ExpiredTimelocks::Punish)),
BobState::BtcRefunded(_)
| BobState::BtcRedeemed(_)
| BobState::XmrRedeemed { .. } => None,
};
Ok(json!({
"swapId": swap_id,
"seller": {
"peerId": peerId.to_string(),
"addresses": addresses
},
"completed": is_completed,
"startDate": start_date,
"stateName": state_name,
"xmrAmount": xmr_amount,
"btcAmount": btc_amount,
"txLockId": tx_lock_id,
"txCancelFee": tx_cancel_fee,
"txRefundFee": tx_refund_fee,
"txLockFee": tx_lock_fee,
"btcRefundAddress": btc_refund_address.to_string(),
"cancelTimelock": cancel_timelock,
"punishTimelock": punish_timelock,
// If the timelock is None, it means that the swap is in a state where the timelock is not accessible to us.
// If that is the case, we return null. Otherwise, we return the timelock.
"timelock": timelock.map(|tl| tl.map(|tl| json!(tl)).unwrap_or(json!(null))).unwrap_or(json!(null)),
}))
}
Method::BuyXmr {
seller,
bitcoin_change_address,
monero_receive_address,
swap_id,
} => {
let bitcoin_wallet = Arc::clone(
context
.bitcoin_wallet
.as_ref()
.expect("Could not find Bitcoin wallet"),
);
let monero_wallet = Arc::clone(
context
.monero_wallet
.as_ref()
.context("Could not get Monero wallet")?,
);
let env_config = context.config.env_config;
let seed = context.config.seed.clone().context("Could not get seed")?;
let seller_peer_id = seller
.extract_peer_id()
.context("Seller address must contain peer ID")?;
context
.db
.insert_address(seller_peer_id, seller.clone())
.await?;
let behaviour = cli::Behaviour::new(
seller_peer_id,
env_config,
bitcoin_wallet.clone(),
(seed.derive_libp2p_identity(), context.config.namespace),
);
let mut swarm = swarm::cli(
seed.derive_libp2p_identity(),
context.config.tor_socks5_port,
behaviour,
)
.await?;
swarm.behaviour_mut().add_address(seller_peer_id, seller);
context
.db
.insert_monero_address(swap_id, monero_receive_address)
.await?;
tracing::debug!(peer_id = %swarm.local_peer_id(), "Network layer initialized");
context.swap_lock.acquire_swap_lock(swap_id).await?;
let initialize_swap = tokio::select! {
biased;
_ = context.swap_lock.listen_for_swap_force_suspension() => {
tracing::debug!("Shutdown signal received, exiting");
context.swap_lock.release_swap_lock().await.expect("Shutdown signal received but failed to release swap lock. The swap process has been terminated but the swap lock is still active.");
bail!("Shutdown signal received");
},
result = async {
let (event_loop, mut event_loop_handle) =
EventLoop::new(swap_id, swarm, seller_peer_id)?;
let event_loop = tokio::spawn(event_loop.run().in_current_span());
let bid_quote = event_loop_handle.request_quote().await?;
Ok::<_, anyhow::Error>((event_loop, event_loop_handle, bid_quote))
} => {
result
},
};
let (event_loop, event_loop_handle, bid_quote) = match initialize_swap {
Ok(result) => result,
Err(error) => {
tracing::error!(%swap_id, "Swap initialization failed: {:#}", error);
context
.swap_lock
.release_swap_lock()
.await
.expect("Could not release swap lock");
bail!(error);
}
};
context.tasks.clone().spawn(async move {
tokio::select! {
biased;
_ = context.swap_lock.listen_for_swap_force_suspension() => {
tracing::debug!("Shutdown signal received, exiting");
context.swap_lock.release_swap_lock().await.expect("Shutdown signal received but failed to release swap lock. The swap process has been terminated but the swap lock is still active.");
bail!("Shutdown signal received");
},
event_loop_result = event_loop => {
match event_loop_result {
Ok(_) => {
tracing::debug!(%swap_id, "EventLoop completed")
}
Err(error) => {
tracing::error!(%swap_id, "EventLoop failed: {:#}", error)
}
}
},
swap_result = async {
let max_givable = || bitcoin_wallet.max_giveable(TxLock::script_size());
let estimate_fee = |amount| bitcoin_wallet.estimate_fee(TxLock::weight(), amount);
let determine_amount = determine_btc_to_swap(
context.config.json,
bid_quote,
bitcoin_wallet.new_address(),
|| bitcoin_wallet.balance(),
max_givable,
|| bitcoin_wallet.sync(),
estimate_fee,
);
let (amount, fees) = match determine_amount.await {
Ok(val) => val,
Err(error) => match error.downcast::<ZeroQuoteReceived>() {
Ok(_) => {
bail!("Seller's XMR balance is currently too low to initiate a swap, please try again later")
}
Err(other) => bail!(other),
},
};
tracing::info!(%amount, %fees, "Determined swap amount");
context.db.insert_peer_id(swap_id, seller_peer_id).await?;
let swap = Swap::new(
Arc::clone(&context.db),
swap_id,
Arc::clone(&bitcoin_wallet),
monero_wallet,
env_config,
event_loop_handle,
monero_receive_address,
bitcoin_change_address,
amount,
);
bob::run(swap).await
} => {
match swap_result {
Ok(state) => {
tracing::debug!(%swap_id, state=%state, "Swap completed")
}
Err(error) => {
tracing::error!(%swap_id, "Failed to complete swap: {:#}", error)
}
}
},
};
tracing::debug!(%swap_id, "Swap completed");
context
.swap_lock
.release_swap_lock()
.await
.expect("Could not release swap lock");
Ok::<_, anyhow::Error>(())
}.in_current_span()).await;
Ok(json!({
"swapId": swap_id.to_string(),
"quote": bid_quote,
}))
}
Method::Resume { swap_id } => {
context.swap_lock.acquire_swap_lock(swap_id).await?;
let seller_peer_id = context.db.get_peer_id(swap_id).await?;
let seller_addresses = context.db.get_addresses(seller_peer_id).await?;
let seed = context
.config
.seed
.as_ref()
.context("Could not get seed")?
.derive_libp2p_identity();
let behaviour = cli::Behaviour::new(
seller_peer_id,
context.config.env_config,
Arc::clone(
context
.bitcoin_wallet
.as_ref()
.context("Could not get Bitcoin wallet")?,
),
(seed.clone(), context.config.namespace),
);
let mut swarm =
swarm::cli(seed.clone(), context.config.tor_socks5_port, behaviour).await?;
let our_peer_id = swarm.local_peer_id();
tracing::debug!(peer_id = %our_peer_id, "Network layer initialized");
for seller_address in seller_addresses {
swarm
.behaviour_mut()
.add_address(seller_peer_id, seller_address);
}
let (event_loop, event_loop_handle) =
EventLoop::new(swap_id, swarm, seller_peer_id)?;
let monero_receive_address = context.db.get_monero_address(swap_id).await?;
let swap = Swap::from_db(
Arc::clone(&context.db),
swap_id,
Arc::clone(
context
.bitcoin_wallet
.as_ref()
.context("Could not get Bitcoin wallet")?,
),
Arc::clone(
context
.monero_wallet
.as_ref()
.context("Could not get Monero wallet")?,
),
context.config.env_config,
event_loop_handle,
monero_receive_address,
)
.await?;
context.tasks.clone().spawn(
async move {
let handle = tokio::spawn(event_loop.run().in_current_span());
tokio::select! {
biased;
_ = context.swap_lock.listen_for_swap_force_suspension() => {
tracing::debug!("Shutdown signal received, exiting");
context.swap_lock.release_swap_lock().await.expect("Shutdown signal received but failed to release swap lock. The swap process has been terminated but the swap lock is still active.");
bail!("Shutdown signal received");
},
event_loop_result = handle => {
match event_loop_result {
Ok(_) => {
tracing::debug!(%swap_id, "EventLoop completed during swap resume")
}
Err(error) => {
tracing::error!(%swap_id, "EventLoop failed during swap resume: {:#}", error)
}
}
},
swap_result = bob::run(swap) => {
match swap_result {
Ok(state) => {
tracing::debug!(%swap_id, state=%state, "Swap completed after resuming")
}
Err(error) => {
tracing::error!(%swap_id, "Failed to resume swap: {:#}", error)
}
}
}
}
context
.swap_lock
.release_swap_lock()
.await
.expect("Could not release swap lock");
Ok::<(), anyhow::Error>(())
}
.in_current_span(),
).await;
Ok(json!({
"result": "ok",
}))
}
Method::CancelAndRefund { swap_id } => {
let bitcoin_wallet = context
.bitcoin_wallet
.as_ref()
.context("Could not get Bitcoin wallet")?;
context.swap_lock.acquire_swap_lock(swap_id).await?;
let state = cli::cancel_and_refund(
swap_id,
Arc::clone(bitcoin_wallet),
Arc::clone(&context.db),
)
.await;
context
.swap_lock
.release_swap_lock()
.await
.expect("Could not release swap lock");
state.map(|state| {
json!({
"result": state,
})
})
}
Method::History => {
let swaps = context.db.all().await?;
let mut vec: Vec<(Uuid, String)> = Vec::new();
for (swap_id, state) in swaps {
let state: BobState = state.try_into()?;
vec.push((swap_id, state.to_string()));
}
Ok(json!({ "swaps": vec }))
}
Method::GetRawStates => {
let raw_history = context.db.raw_all().await?;
Ok(json!({ "raw_states": raw_history }))
}
Method::Config => {
let data_dir_display = context.config.data_dir.display();
tracing::info!(path=%data_dir_display, "Data directory");
tracing::info!(path=%format!("{}/logs", data_dir_display), "Log files directory");
tracing::info!(path=%format!("{}/sqlite", data_dir_display), "Sqlite file location");
tracing::info!(path=%format!("{}/seed.pem", data_dir_display), "Seed file location");
tracing::info!(path=%format!("{}/monero", data_dir_display), "Monero-wallet-rpc directory");
tracing::info!(path=%format!("{}/wallet", data_dir_display), "Internal bitcoin wallet directory");
Ok(json!({
"log_files": format!("{}/logs", data_dir_display),
"sqlite": format!("{}/sqlite", data_dir_display),
"seed": format!("{}/seed.pem", data_dir_display),
"monero-wallet-rpc": format!("{}/monero", data_dir_display),
"bitcoin_wallet": format!("{}/wallet", data_dir_display),
}))
}
Method::WithdrawBtc { address, amount } => {
let bitcoin_wallet = context
.bitcoin_wallet
.as_ref()
.context("Could not get Bitcoin wallet")?;
let amount = match amount {
Some(amount) => amount,
None => {
bitcoin_wallet
.max_giveable(address.script_pubkey().len())
.await?
}
};
let psbt = bitcoin_wallet
.send_to_address(address, amount, None)
.await?;
let signed_tx = bitcoin_wallet.sign_and_finalize(psbt).await?;
bitcoin_wallet
.broadcast(signed_tx.clone(), "withdraw")
.await?;
Ok(json!({
"signed_tx": signed_tx,
"amount": amount.to_sat(),
"txid": signed_tx.txid(),
}))
}
Method::StartDaemon { server_address } => {
// Default to 127.0.0.1:1234
let server_address = server_address.unwrap_or("127.0.0.1:1234".parse()?);
let (addr, server_handle) =
rpc::run_server(server_address, Arc::clone(&context)).await?;
tracing::info!(%addr, "Started RPC server");
server_handle.stopped().await;
tracing::info!("Stopped RPC server");
Ok(json!({}))
}
Method::Balance { force_refresh } => {
let bitcoin_wallet = context
.bitcoin_wallet
.as_ref()
.context("Could not get Bitcoin wallet")?;
if force_refresh {
bitcoin_wallet.sync().await?;
}
let bitcoin_balance = bitcoin_wallet.balance().await?;
if force_refresh {
tracing::info!(
balance = %bitcoin_balance,
"Checked Bitcoin balance",
);
} else {
tracing::debug!(
balance = %bitcoin_balance,
"Current Bitcoin balance as of last sync",
);
}
Ok(json!({
"balance": bitcoin_balance.to_sat()
}))
}
Method::ListSellers { rendezvous_point } => {
let rendezvous_node_peer_id = rendezvous_point
.extract_peer_id()
.context("Rendezvous node address must contain peer ID")?;
let identity = context
.config
.seed
.as_ref()
.context("Cannot extract seed")?
.derive_libp2p_identity();
let sellers = list_sellers(
rendezvous_node_peer_id,
rendezvous_point,
context.config.namespace,
context.config.tor_socks5_port,
identity,
)
.await?;
for seller in &sellers {
match seller.status {
SellerStatus::Online(quote) => {
tracing::info!(
price = %quote.price.to_string(),
min_quantity = %quote.min_quantity.to_string(),
max_quantity = %quote.max_quantity.to_string(),
status = "Online",
address = %seller.multiaddr.to_string(),
"Fetched peer status"
);
}
SellerStatus::Unreachable => {
tracing::info!(
status = "Unreachable",
address = %seller.multiaddr.to_string(),
"Fetched peer status"
);
}
}
}
Ok(json!({ "sellers": sellers }))
}
Method::ExportBitcoinWallet => {
let bitcoin_wallet = context
.bitcoin_wallet
.as_ref()
.context("Could not get Bitcoin wallet")?;
let wallet_export = bitcoin_wallet.wallet_export("cli").await?;
tracing::info!(descriptor=%wallet_export.to_string(), "Exported bitcoin wallet");
Ok(json!({
"descriptor": wallet_export.to_string(),
}))
}
Method::MoneroRecovery { swap_id } => {
let swap_state: BobState = context.db.get_state(swap_id).await?.try_into()?;
if let BobState::BtcRedeemed(state5) = swap_state {
let (spend_key, view_key) = state5.xmr_keys();
let restore_height = state5.monero_wallet_restore_blockheight.height;
let address = monero::Address::standard(
context.config.env_config.monero_network,
monero::PublicKey::from_private_key(&spend_key),
monero::PublicKey::from(view_key.public()),
);
tracing::info!(restore_height=%restore_height, address=%address, spend_key=%spend_key, view_key=%view_key, "Monero recovery information");
Ok(json!({
"address": address,
"spend_key": spend_key.to_string(),
"view_key": view_key.to_string(),
"restore_height": state5.monero_wallet_restore_blockheight.height,
}))
} else {
bail!(
"Cannot print monero recovery information in state {}, only possible for BtcRedeemed",
swap_state
)
}
}
Method::GetCurrentSwap => Ok(json!({
"swap_id": context.swap_lock.get_current_swap_id().await
})),
}
}
pub async fn call(self, context: Arc<Context>) -> Result<serde_json::Value> {
let method_span = self.cmd.get_tracing_span(self.log_reference.clone());
self.handle_cmd(context)
.instrument(method_span.clone())
.await
.map_err(|err| {
method_span.in_scope(|| {
tracing::debug!(err = format!("{:?}", err), "API call resulted in an error");
});
err
})
}
}
fn qr_code(value: &impl ToString) -> Result<String> {
let code = QrCode::new(value.to_string())?;
let qr_code = code
.render::<unicode::Dense1x2>()
.dark_color(unicode::Dense1x2::Light)
.light_color(unicode::Dense1x2::Dark)
.build();
Ok(qr_code)
}
pub async fn determine_btc_to_swap<FB, TB, FMG, TMG, FS, TS, FFE, TFE>(
json: bool,
bid_quote: BidQuote,
get_new_address: impl Future<Output = Result<bitcoin::Address>>,
balance: FB,
max_giveable_fn: FMG,
sync: FS,
estimate_fee: FFE,
) -> Result<(Amount, Amount)>
where
TB: Future<Output = Result<Amount>>,
FB: Fn() -> TB,
TMG: Future<Output = Result<Amount>>,
FMG: Fn() -> TMG,
TS: Future<Output = Result<()>>,
FS: Fn() -> TS,
FFE: Fn(Amount) -> TFE,
TFE: Future<Output = Result<Amount>>,
{
if bid_quote.max_quantity == Amount::ZERO {
bail!(ZeroQuoteReceived)
}
tracing::info!(
price = %bid_quote.price,
minimum_amount = %bid_quote.min_quantity,
maximum_amount = %bid_quote.max_quantity,
"Received quote",
);
sync().await?;
let mut max_giveable = max_giveable_fn().await?;
if max_giveable == Amount::ZERO || max_giveable < bid_quote.min_quantity {
let deposit_address = get_new_address.await?;
let minimum_amount = bid_quote.min_quantity;
let maximum_amount = bid_quote.max_quantity;
if !json {
eprintln!("{}", qr_code(&deposit_address)?);
}
loop {
let min_outstanding = bid_quote.min_quantity - max_giveable;
let min_fee = estimate_fee(min_outstanding).await?;
let min_deposit = min_outstanding + min_fee;
tracing::info!(
"Deposit at least {} to cover the min quantity with fee!",
min_deposit
);
tracing::info!(
%deposit_address,
%min_deposit,
%max_giveable,
%minimum_amount,
%maximum_amount,
"Waiting for Bitcoin deposit",
);
max_giveable = loop {
sync().await?;
let new_max_givable = max_giveable_fn().await?;
if new_max_givable > max_giveable {
break new_max_givable;
}
tokio::time::sleep(Duration::from_secs(1)).await;
};
let new_balance = balance().await?;
tracing::info!(%new_balance, %max_giveable, "Received Bitcoin");
if max_giveable < bid_quote.min_quantity {
tracing::info!("Deposited amount is less than `min_quantity`");
continue;
}
break;
}
};
let balance = balance().await?;
let fees = balance - max_giveable;
let max_accepted = bid_quote.max_quantity;
let btc_swap_amount = min(max_giveable, max_accepted);
Ok((btc_swap_amount, fees))
}

View File

@ -12,43 +12,15 @@
#![forbid(unsafe_code)]
#![allow(non_snake_case)]
use anyhow::{bail, Context, Result};
use comfy_table::Table;
use qrcode::render::unicode;
use qrcode::QrCode;
use std::cmp::min;
use std::convert::TryInto;
use anyhow::Result;
use std::env;
use std::future::Future;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use swap::bitcoin::TxLock;
use swap::cli::command::{parse_args_and_apply_defaults, Arguments, Command, ParseResult};
use swap::cli::{list_sellers, EventLoop, SellerStatus};
use swap::cli::command::{parse_args_and_apply_defaults, ParseResult};
use swap::common::check_latest_version;
use swap::database::open_db;
use swap::env::Config;
use swap::libp2p_ext::MultiAddrExt;
use swap::network::quote::{BidQuote, ZeroQuoteReceived};
use swap::network::swarm;
use swap::protocol::bob;
use swap::protocol::bob::{BobState, Swap};
use swap::seed::Seed;
use swap::{bitcoin, cli, monero};
use url::Url;
use uuid::Uuid;
#[tokio::main]
async fn main() -> Result<()> {
let Arguments {
env_config,
data_dir,
debug,
json,
cmd,
} = match parse_args_and_apply_defaults(env::args_os())? {
ParseResult::Arguments(args) => *args,
let (context, request) = match parse_args_and_apply_defaults(env::args_os()).await? {
ParseResult::Context(context, request) => (context, request),
ParseResult::PrintAndExitZero { message } => {
println!("{}", message);
std::process::exit(0);
@ -58,601 +30,19 @@ async fn main() -> Result<()> {
if let Err(e) = check_latest_version(env!("CARGO_PKG_VERSION")).await {
eprintln!("{}", e);
}
match cmd {
Command::BuyXmr {
seller,
bitcoin_electrum_rpc_url,
bitcoin_target_block,
bitcoin_change_address,
monero_receive_address,
monero_daemon_address,
tor_socks5_port,
namespace,
} => {
let swap_id = Uuid::new_v4();
cli::tracing::init(debug, json, data_dir.join("logs"), Some(swap_id))?;
let db = open_db(data_dir.join("sqlite")).await?;
let seed = Seed::from_file_or_generate(data_dir.as_path())
.context("Failed to read in seed file")?;
let bitcoin_wallet = init_bitcoin_wallet(
bitcoin_electrum_rpc_url,
&seed,
data_dir.clone(),
env_config,
bitcoin_target_block,
)
.await?;
let (monero_wallet, _process) =
init_monero_wallet(data_dir, monero_daemon_address, env_config).await?;
let bitcoin_wallet = Arc::new(bitcoin_wallet);
let seller_peer_id = seller
.extract_peer_id()
.context("Seller address must contain peer ID")?;
db.insert_address(seller_peer_id, seller.clone()).await?;
let behaviour = cli::Behaviour::new(
seller_peer_id,
env_config,
bitcoin_wallet.clone(),
(seed.derive_libp2p_identity(), namespace),
);
let mut swarm =
swarm::cli(seed.derive_libp2p_identity(), tor_socks5_port, behaviour).await?;
swarm.behaviour_mut().add_address(seller_peer_id, seller);
tracing::debug!(peer_id = %swarm.local_peer_id(), "Network layer initialized");
let (event_loop, mut event_loop_handle) =
EventLoop::new(swap_id, swarm, seller_peer_id)?;
let event_loop = tokio::spawn(event_loop.run());
let max_givable = || bitcoin_wallet.max_giveable(TxLock::script_size());
let estimate_fee = |amount| bitcoin_wallet.estimate_fee(TxLock::weight(), amount);
let (amount, fees) = match determine_btc_to_swap(
json,
event_loop_handle.request_quote(),
bitcoin_wallet.new_address(),
|| bitcoin_wallet.balance(),
max_givable,
|| bitcoin_wallet.sync(),
estimate_fee,
)
.await
{
Ok(val) => val,
Err(error) => match error.downcast::<ZeroQuoteReceived>() {
Ok(_) => {
bail!("Seller's XMR balance is currently too low to initiate a swap, please try again later")
}
Err(other) => bail!(other),
},
};
tracing::info!(%amount, %fees, "Determined swap amount");
db.insert_peer_id(swap_id, seller_peer_id).await?;
db.insert_monero_address(swap_id, monero_receive_address)
.await?;
let swap = Swap::new(
db,
swap_id,
bitcoin_wallet,
Arc::new(monero_wallet),
env_config,
event_loop_handle,
monero_receive_address,
bitcoin_change_address,
amount,
);
tokio::select! {
result = event_loop => {
result
.context("EventLoop panicked")?;
},
result = bob::run(swap) => {
result.context("Failed to complete swap")?;
}
}
}
Command::History => {
cli::tracing::init(debug, json, data_dir.join("logs"), None)?;
let db = open_db(data_dir.join("sqlite")).await?;
let swaps = db.all().await?;
if json {
for (swap_id, state) in swaps {
let state: BobState = state.try_into()?;
tracing::info!(swap_id=%swap_id.to_string(), state=%state.to_string(), "Read swap state from database");
}
} else {
let mut table = Table::new();
table.set_header(vec!["SWAP ID", "STATE"]);
for (swap_id, state) in swaps {
let state: BobState = state.try_into()?;
table.add_row(vec![swap_id.to_string(), state.to_string()]);
}
println!("{}", table);
}
}
Command::Config => {
cli::tracing::init(debug, json, data_dir.join("logs"), None)?;
tracing::info!(path=%data_dir.display(), "Data directory");
tracing::info!(path=%format!("{}/logs", data_dir.display()), "Log files directory");
tracing::info!(path=%format!("{}/sqlite", data_dir.display()), "Sqlite file location");
tracing::info!(path=%format!("{}/seed.pem", data_dir.display()), "Seed file location");
tracing::info!(path=%format!("{}/monero", data_dir.display()), "Monero-wallet-rpc directory");
tracing::info!(path=%format!("{}/wallet", data_dir.display()), "Internal bitcoin wallet directory");
}
Command::WithdrawBtc {
bitcoin_electrum_rpc_url,
bitcoin_target_block,
amount,
address,
} => {
cli::tracing::init(debug, json, data_dir.join("logs"), None)?;
let seed = Seed::from_file_or_generate(data_dir.as_path())
.context("Failed to read in seed file")?;
let bitcoin_wallet = init_bitcoin_wallet(
bitcoin_electrum_rpc_url,
&seed,
data_dir.clone(),
env_config,
bitcoin_target_block,
)
.await?;
let amount = match amount {
Some(amount) => amount,
None => {
bitcoin_wallet
.max_giveable(address.script_pubkey().len())
.await?
}
};
let psbt = bitcoin_wallet
.send_to_address(address, amount, None)
.await?;
let signed_tx = bitcoin_wallet.sign_and_finalize(psbt).await?;
bitcoin_wallet.broadcast(signed_tx, "withdraw").await?;
}
Command::Balance {
bitcoin_electrum_rpc_url,
bitcoin_target_block,
} => {
cli::tracing::init(debug, json, data_dir.join("logs"), None)?;
let seed = Seed::from_file_or_generate(data_dir.as_path())
.context("Failed to read in seed file")?;
let bitcoin_wallet = init_bitcoin_wallet(
bitcoin_electrum_rpc_url,
&seed,
data_dir.clone(),
env_config,
bitcoin_target_block,
)
.await?;
let bitcoin_balance = bitcoin_wallet.balance().await?;
tracing::info!(
balance = %bitcoin_balance,
"Checked Bitcoin balance",
);
}
Command::Resume {
swap_id,
bitcoin_electrum_rpc_url,
bitcoin_target_block,
monero_daemon_address,
tor_socks5_port,
namespace,
} => {
cli::tracing::init(debug, json, data_dir.join("logs"), Some(swap_id))?;
let db = open_db(data_dir.join("sqlite")).await?;
let seed = Seed::from_file_or_generate(data_dir.as_path())
.context("Failed to read in seed file")?;
let bitcoin_wallet = init_bitcoin_wallet(
bitcoin_electrum_rpc_url,
&seed,
data_dir.clone(),
env_config,
bitcoin_target_block,
)
.await?;
let (monero_wallet, _process) =
init_monero_wallet(data_dir, monero_daemon_address, env_config).await?;
let bitcoin_wallet = Arc::new(bitcoin_wallet);
let seller_peer_id = db.get_peer_id(swap_id).await?;
let seller_addresses = db.get_addresses(seller_peer_id).await?;
let behaviour = cli::Behaviour::new(
seller_peer_id,
env_config,
bitcoin_wallet.clone(),
(seed.derive_libp2p_identity(), namespace),
);
let mut swarm =
swarm::cli(seed.derive_libp2p_identity(), tor_socks5_port, behaviour).await?;
let our_peer_id = swarm.local_peer_id();
tracing::debug!(peer_id = %our_peer_id, "Network layer initialized");
for seller_address in seller_addresses {
swarm
.behaviour_mut()
.add_address(seller_peer_id, seller_address);
}
let (event_loop, event_loop_handle) = EventLoop::new(swap_id, swarm, seller_peer_id)?;
let handle = tokio::spawn(event_loop.run());
let monero_receive_address = db.get_monero_address(swap_id).await?;
let swap = Swap::from_db(
db,
swap_id,
bitcoin_wallet,
Arc::new(monero_wallet),
env_config,
event_loop_handle,
monero_receive_address,
)
.await?;
tokio::select! {
event_loop_result = handle => {
event_loop_result?;
},
swap_result = bob::run(swap) => {
swap_result?;
}
}
}
Command::CancelAndRefund {
swap_id,
bitcoin_electrum_rpc_url,
bitcoin_target_block,
} => {
cli::tracing::init(debug, json, data_dir.join("logs"), Some(swap_id))?;
let db = open_db(data_dir.join("sqlite")).await?;
let seed = Seed::from_file_or_generate(data_dir.as_path())
.context("Failed to read in seed file")?;
let bitcoin_wallet = init_bitcoin_wallet(
bitcoin_electrum_rpc_url,
&seed,
data_dir,
env_config,
bitcoin_target_block,
)
.await?;
cli::cancel_and_refund(swap_id, Arc::new(bitcoin_wallet), db).await?;
}
Command::ListSellers {
rendezvous_point,
namespace,
tor_socks5_port,
} => {
let rendezvous_node_peer_id = rendezvous_point
.extract_peer_id()
.context("Rendezvous node address must contain peer ID")?;
cli::tracing::init(debug, json, data_dir.join("logs"), None)?;
let seed = Seed::from_file_or_generate(data_dir.as_path())
.context("Failed to read in seed file")?;
let identity = seed.derive_libp2p_identity();
let sellers = list_sellers(
rendezvous_node_peer_id,
rendezvous_point,
namespace,
tor_socks5_port,
identity,
)
.await?;
if json {
for seller in sellers {
match seller.status {
SellerStatus::Online(quote) => {
tracing::info!(
price = %quote.price.to_string(),
min_quantity = %quote.min_quantity.to_string(),
max_quantity = %quote.max_quantity.to_string(),
status = "Online",
address = %seller.multiaddr.to_string(),
"Fetched peer status"
);
}
SellerStatus::Unreachable => {
tracing::info!(
status = "Unreachable",
address = %seller.multiaddr.to_string(),
"Fetched peer status"
);
}
}
}
} else {
let mut table = Table::new();
table.set_header(vec![
"PRICE",
"MIN_QUANTITY",
"MAX_QUANTITY",
"STATUS",
"ADDRESS",
]);
for seller in sellers {
let row = match seller.status {
SellerStatus::Online(quote) => {
vec![
quote.price.to_string(),
quote.min_quantity.to_string(),
quote.max_quantity.to_string(),
"Online".to_owned(),
seller.multiaddr.to_string(),
]
}
SellerStatus::Unreachable => {
vec![
"???".to_owned(),
"???".to_owned(),
"???".to_owned(),
"Unreachable".to_owned(),
seller.multiaddr.to_string(),
]
}
};
table.add_row(row);
}
println!("{}", table);
}
}
Command::ExportBitcoinWallet {
bitcoin_electrum_rpc_url,
bitcoin_target_block,
} => {
cli::tracing::init(debug, json, data_dir.join("logs"), None)?;
let seed = Seed::from_file_or_generate(data_dir.as_path())
.context("Failed to read in seed file")?;
let bitcoin_wallet = init_bitcoin_wallet(
bitcoin_electrum_rpc_url,
&seed,
data_dir.clone(),
env_config,
bitcoin_target_block,
)
.await?;
let wallet_export = bitcoin_wallet.wallet_export("cli").await?;
tracing::info!(descriptor=%wallet_export.to_string(), "Exported bitcoin wallet");
}
Command::MoneroRecovery { swap_id } => {
cli::tracing::init(debug, json, data_dir.join("logs"), Some(swap_id))?;
let db = open_db(data_dir.join("sqlite")).await?;
let swap_state: BobState = db.get_state(swap_id).await?.try_into()?;
match swap_state {
BobState::Started { .. }
| BobState::SwapSetupCompleted(_)
| BobState::BtcLocked { .. }
| BobState::XmrLockProofReceived { .. }
| BobState::XmrLocked(_)
| BobState::EncSigSent(_)
| BobState::CancelTimelockExpired(_)
| BobState::BtcCancelled(_)
| BobState::BtcRefunded(_)
| BobState::BtcPunished { .. }
| BobState::SafelyAborted
| BobState::XmrRedeemed { .. } => {
bail!("Cannot print monero recovery information in state {}, only possible for BtcRedeemed", swap_state)
}
BobState::BtcRedeemed(state5) => {
let (spend_key, view_key) = state5.xmr_keys();
let address = monero::Address::standard(
env_config.monero_network,
monero::PublicKey::from_private_key(&spend_key),
monero::PublicKey::from(view_key.public()),
);
tracing::info!("Wallet address: {}", address.to_string());
let view_key = serde_json::to_string(&view_key)?;
println!("View key: {}", view_key);
println!("Spend key: {}", spend_key);
}
}
}
};
request.call(context.clone()).await?;
context.tasks.wait_for_tasks().await?;
Ok(())
}
async fn init_bitcoin_wallet(
electrum_rpc_url: Url,
seed: &Seed,
data_dir: PathBuf,
env_config: Config,
bitcoin_target_block: usize,
) -> Result<bitcoin::Wallet> {
tracing::debug!("Initializing bitcoin wallet");
let xprivkey = seed.derive_extended_private_key(env_config.bitcoin_network)?;
let wallet = bitcoin::Wallet::new(
electrum_rpc_url.clone(),
data_dir,
xprivkey,
env_config,
bitcoin_target_block,
)
.await
.context("Failed to initialize Bitcoin wallet")?;
tracing::debug!("Syncing bitcoin wallet");
wallet.sync().await?;
Ok(wallet)
}
async fn init_monero_wallet(
data_dir: PathBuf,
monero_daemon_address: Option<String>,
env_config: Config,
) -> Result<(monero::Wallet, monero::WalletRpcProcess)> {
let network = env_config.monero_network;
const MONERO_BLOCKCHAIN_MONITORING_WALLET_NAME: &str = "swap-tool-blockchain-monitoring-wallet";
let monero_wallet_rpc = monero::WalletRpc::new(data_dir.join("monero")).await?;
let monero_wallet_rpc_process = monero_wallet_rpc
.run(network, monero_daemon_address)
.await?;
let monero_wallet = monero::Wallet::open_or_create(
monero_wallet_rpc_process.endpoint(),
MONERO_BLOCKCHAIN_MONITORING_WALLET_NAME.to_string(),
env_config,
)
.await?;
Ok((monero_wallet, monero_wallet_rpc_process))
}
fn qr_code(value: &impl ToString) -> Result<String> {
let code = QrCode::new(value.to_string())?;
let qr_code = code
.render::<unicode::Dense1x2>()
.dark_color(unicode::Dense1x2::Light)
.light_color(unicode::Dense1x2::Dark)
.build();
Ok(qr_code)
}
async fn determine_btc_to_swap<FB, TB, FMG, TMG, FS, TS, FFE, TFE>(
json: bool,
bid_quote: impl Future<Output = Result<BidQuote>>,
get_new_address: impl Future<Output = Result<bitcoin::Address>>,
balance: FB,
max_giveable_fn: FMG,
sync: FS,
estimate_fee: FFE,
) -> Result<(bitcoin::Amount, bitcoin::Amount)>
where
TB: Future<Output = Result<bitcoin::Amount>>,
FB: Fn() -> TB,
TMG: Future<Output = Result<bitcoin::Amount>>,
FMG: Fn() -> TMG,
TS: Future<Output = Result<()>>,
FS: Fn() -> TS,
FFE: Fn(bitcoin::Amount) -> TFE,
TFE: Future<Output = Result<bitcoin::Amount>>,
{
tracing::debug!("Requesting quote");
let bid_quote = bid_quote.await?;
if bid_quote.max_quantity == bitcoin::Amount::ZERO {
bail!(ZeroQuoteReceived)
}
tracing::info!(
price = %bid_quote.price,
minimum_amount = %bid_quote.min_quantity,
maximum_amount = %bid_quote.max_quantity,
"Received quote",
);
let mut max_giveable = max_giveable_fn().await?;
if max_giveable == bitcoin::Amount::ZERO || max_giveable < bid_quote.min_quantity {
let deposit_address = get_new_address.await?;
let minimum_amount = bid_quote.min_quantity;
let maximum_amount = bid_quote.max_quantity;
if !json {
eprintln!("{}", qr_code(&deposit_address)?);
}
loop {
let min_outstanding = bid_quote.min_quantity - max_giveable;
let min_fee = estimate_fee(min_outstanding).await?;
let min_deposit = min_outstanding + min_fee;
tracing::info!(
"Deposit at least {} to cover the min quantity with fee!",
min_deposit
);
tracing::info!(
%deposit_address,
%min_deposit,
%max_giveable,
%minimum_amount,
%maximum_amount,
"Waiting for Bitcoin deposit",
);
max_giveable = loop {
sync().await?;
let new_max_givable = max_giveable_fn().await?;
if new_max_givable > max_giveable {
break new_max_givable;
}
tokio::time::sleep(Duration::from_secs(1)).await;
};
let new_balance = balance().await?;
tracing::info!(%new_balance, %max_giveable, "Received Bitcoin");
if max_giveable < bid_quote.min_quantity {
tracing::info!("Deposited amount is less than `min_quantity`");
continue;
}
break;
}
};
let balance = balance().await?;
let fees = balance - max_giveable;
let max_accepted = bid_quote.max_quantity;
let btc_swap_amount = min(max_giveable, max_accepted);
Ok((btc_swap_amount, fees))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::determine_btc_to_swap;
use ::bitcoin::Amount;
use std::sync::Mutex;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use swap::api::request::determine_btc_to_swap;
use swap::network::quote::BidQuote;
use swap::tracing_ext::capture_logs;
use tracing::level_filters::LevelFilter;
@ -666,7 +56,7 @@ mod tests {
let (amount, fees) = determine_btc_to_swap(
true,
async { Ok(quote_with_max(0.01)) },
quote_with_max(0.01),
get_dummy_address(),
|| async { Ok(Amount::from_btc(0.001)?) },
|| async {
@ -685,10 +75,10 @@ mod tests {
assert_eq!((amount, fees), (expected_amount, expected_fees));
assert_eq!(
writer.captured(),
r" INFO swap: Received quote price=0.00100000 BTC minimum_amount=0.00000000 BTC maximum_amount=0.01000000 BTC
INFO swap: Deposit at least 0.00001000 BTC to cover the min quantity with fee!
INFO swap: Waiting for Bitcoin deposit deposit_address=1PdfytjS7C8wwd9Lq5o4x9aXA2YRqaCpH6 min_deposit=0.00001000 BTC max_giveable=0.00000000 BTC minimum_amount=0.00000000 BTC maximum_amount=0.01000000 BTC
INFO swap: Received Bitcoin new_balance=0.00100000 BTC max_giveable=0.00090000 BTC
r" INFO swap::api::request: Received quote price=0.001 BTC minimum_amount=0 BTC maximum_amount=0.01 BTC
INFO swap::api::request: Deposit at least 0.00001 BTC to cover the min quantity with fee!
INFO swap::api::request: Waiting for Bitcoin deposit deposit_address=1PdfytjS7C8wwd9Lq5o4x9aXA2YRqaCpH6 min_deposit=0.00001 BTC max_giveable=0 BTC minimum_amount=0 BTC maximum_amount=0.01 BTC
INFO swap::api::request: Received Bitcoin new_balance=0.001 BTC max_giveable=0.0009 BTC
"
);
}
@ -703,7 +93,7 @@ mod tests {
let (amount, fees) = determine_btc_to_swap(
true,
async { Ok(quote_with_max(0.01)) },
quote_with_max(0.01),
get_dummy_address(),
|| async { Ok(Amount::from_btc(0.1001)?) },
|| async {
@ -722,10 +112,10 @@ mod tests {
assert_eq!((amount, fees), (expected_amount, expected_fees));
assert_eq!(
writer.captured(),
r" INFO swap: Received quote price=0.00100000 BTC minimum_amount=0.00000000 BTC maximum_amount=0.01000000 BTC
INFO swap: Deposit at least 0.00001000 BTC to cover the min quantity with fee!
INFO swap: Waiting for Bitcoin deposit deposit_address=1PdfytjS7C8wwd9Lq5o4x9aXA2YRqaCpH6 min_deposit=0.00001000 BTC max_giveable=0.00000000 BTC minimum_amount=0.00000000 BTC maximum_amount=0.01000000 BTC
INFO swap: Received Bitcoin new_balance=0.10010000 BTC max_giveable=0.10000000 BTC
r" INFO swap::api::request: Received quote price=0.001 BTC minimum_amount=0 BTC maximum_amount=0.01 BTC
INFO swap::api::request: Deposit at least 0.00001 BTC to cover the min quantity with fee!
INFO swap::api::request: Waiting for Bitcoin deposit deposit_address=1PdfytjS7C8wwd9Lq5o4x9aXA2YRqaCpH6 min_deposit=0.00001 BTC max_giveable=0 BTC minimum_amount=0 BTC maximum_amount=0.01 BTC
INFO swap::api::request: Received Bitcoin new_balance=0.1001 BTC max_giveable=0.1 BTC
"
);
}
@ -740,7 +130,7 @@ mod tests {
let (amount, fees) = determine_btc_to_swap(
true,
async { Ok(quote_with_max(0.01)) },
quote_with_max(0.01),
async { panic!("should not request new address when initial balance is > 0") },
|| async { Ok(Amount::from_btc(0.005)?) },
|| async {
@ -759,7 +149,7 @@ mod tests {
assert_eq!((amount, fees), (expected_amount, expected_fees));
assert_eq!(
writer.captured(),
" INFO swap: Received quote price=0.00100000 BTC minimum_amount=0.00000000 BTC maximum_amount=0.01000000 BTC\n"
" INFO swap::api::request: Received quote price=0.001 BTC minimum_amount=0 BTC maximum_amount=0.01 BTC\n"
);
}
@ -773,7 +163,7 @@ mod tests {
let (amount, fees) = determine_btc_to_swap(
true,
async { Ok(quote_with_max(0.01)) },
quote_with_max(0.01),
async { panic!("should not request new address when initial balance is > 0") },
|| async { Ok(Amount::from_btc(0.1001)?) },
|| async {
@ -792,7 +182,7 @@ mod tests {
assert_eq!((amount, fees), (expected_amount, expected_fees));
assert_eq!(
writer.captured(),
" INFO swap: Received quote price=0.00100000 BTC minimum_amount=0.00000000 BTC maximum_amount=0.01000000 BTC\n"
" INFO swap::api::request: Received quote price=0.001 BTC minimum_amount=0 BTC maximum_amount=0.01 BTC\n"
);
}
@ -806,7 +196,7 @@ mod tests {
let (amount, fees) = determine_btc_to_swap(
true,
async { Ok(quote_with_min(0.01)) },
quote_with_min(0.01),
get_dummy_address(),
|| async { Ok(Amount::from_btc(0.0101)?) },
|| async {
@ -825,10 +215,10 @@ mod tests {
assert_eq!((amount, fees), (expected_amount, expected_fees));
assert_eq!(
writer.captured(),
r" INFO swap: Received quote price=0.00100000 BTC minimum_amount=0.01000000 BTC maximum_amount=184467440737.09551615 BTC
INFO swap: Deposit at least 0.01001000 BTC to cover the min quantity with fee!
INFO swap: Waiting for Bitcoin deposit deposit_address=1PdfytjS7C8wwd9Lq5o4x9aXA2YRqaCpH6 min_deposit=0.01001000 BTC max_giveable=0.00000000 BTC minimum_amount=0.01000000 BTC maximum_amount=184467440737.09551615 BTC
INFO swap: Received Bitcoin new_balance=0.01010000 BTC max_giveable=0.01000000 BTC
r" INFO swap::api::request: Received quote price=0.001 BTC minimum_amount=0.01 BTC maximum_amount=184467440737.09551615 BTC
INFO swap::api::request: Deposit at least 0.01001 BTC to cover the min quantity with fee!
INFO swap::api::request: Waiting for Bitcoin deposit deposit_address=1PdfytjS7C8wwd9Lq5o4x9aXA2YRqaCpH6 min_deposit=0.01001 BTC max_giveable=0 BTC minimum_amount=0.01 BTC maximum_amount=184467440737.09551615 BTC
INFO swap::api::request: Received Bitcoin new_balance=0.0101 BTC max_giveable=0.01 BTC
"
);
}
@ -843,7 +233,7 @@ mod tests {
let (amount, fees) = determine_btc_to_swap(
true,
async { Ok(quote_with_min(0.01)) },
quote_with_min(0.01),
get_dummy_address(),
|| async { Ok(Amount::from_btc(0.0101)?) },
|| async {
@ -862,10 +252,10 @@ mod tests {
assert_eq!((amount, fees), (expected_amount, expected_fees));
assert_eq!(
writer.captured(),
r" INFO swap: Received quote price=0.00100000 BTC minimum_amount=0.01000000 BTC maximum_amount=184467440737.09551615 BTC
INFO swap: Deposit at least 0.00991000 BTC to cover the min quantity with fee!
INFO swap: Waiting for Bitcoin deposit deposit_address=1PdfytjS7C8wwd9Lq5o4x9aXA2YRqaCpH6 min_deposit=0.00991000 BTC max_giveable=0.00010000 BTC minimum_amount=0.01000000 BTC maximum_amount=184467440737.09551615 BTC
INFO swap: Received Bitcoin new_balance=0.01010000 BTC max_giveable=0.01000000 BTC
r" INFO swap::api::request: Received quote price=0.001 BTC minimum_amount=0.01 BTC maximum_amount=184467440737.09551615 BTC
INFO swap::api::request: Deposit at least 0.00991 BTC to cover the min quantity with fee!
INFO swap::api::request: Waiting for Bitcoin deposit deposit_address=1PdfytjS7C8wwd9Lq5o4x9aXA2YRqaCpH6 min_deposit=0.00991 BTC max_giveable=0.0001 BTC minimum_amount=0.01 BTC maximum_amount=184467440737.09551615 BTC
INFO swap::api::request: Received Bitcoin new_balance=0.0101 BTC max_giveable=0.01 BTC
"
);
}
@ -885,7 +275,7 @@ mod tests {
Duration::from_secs(1),
determine_btc_to_swap(
true,
async { Ok(quote_with_min(0.1)) },
quote_with_min(0.1),
get_dummy_address(),
|| async { Ok(Amount::from_btc(0.0101)?) },
|| async {
@ -902,13 +292,13 @@ mod tests {
assert!(matches!(error, tokio::time::error::Elapsed { .. }));
assert_eq!(
writer.captured(),
r" INFO swap: Received quote price=0.00100000 BTC minimum_amount=0.10000000 BTC maximum_amount=184467440737.09551615 BTC
INFO swap: Deposit at least 0.10001000 BTC to cover the min quantity with fee!
INFO swap: Waiting for Bitcoin deposit deposit_address=1PdfytjS7C8wwd9Lq5o4x9aXA2YRqaCpH6 min_deposit=0.10001000 BTC max_giveable=0.00000000 BTC minimum_amount=0.10000000 BTC maximum_amount=184467440737.09551615 BTC
INFO swap: Received Bitcoin new_balance=0.01010000 BTC max_giveable=0.01000000 BTC
INFO swap: Deposited amount is less than `min_quantity`
INFO swap: Deposit at least 0.09001000 BTC to cover the min quantity with fee!
INFO swap: Waiting for Bitcoin deposit deposit_address=1PdfytjS7C8wwd9Lq5o4x9aXA2YRqaCpH6 min_deposit=0.09001000 BTC max_giveable=0.01000000 BTC minimum_amount=0.10000000 BTC maximum_amount=184467440737.09551615 BTC
r" INFO swap::api::request: Received quote price=0.001 BTC minimum_amount=0.1 BTC maximum_amount=184467440737.09551615 BTC
INFO swap::api::request: Deposit at least 0.10001 BTC to cover the min quantity with fee!
INFO swap::api::request: Waiting for Bitcoin deposit deposit_address=1PdfytjS7C8wwd9Lq5o4x9aXA2YRqaCpH6 min_deposit=0.10001 BTC max_giveable=0 BTC minimum_amount=0.1 BTC maximum_amount=184467440737.09551615 BTC
INFO swap::api::request: Received Bitcoin new_balance=0.0101 BTC max_giveable=0.01 BTC
INFO swap::api::request: Deposited amount is less than `min_quantity`
INFO swap::api::request: Deposit at least 0.09001 BTC to cover the min quantity with fee!
INFO swap::api::request: Waiting for Bitcoin deposit deposit_address=1PdfytjS7C8wwd9Lq5o4x9aXA2YRqaCpH6 min_deposit=0.09001 BTC max_giveable=0.01 BTC minimum_amount=0.1 BTC maximum_amount=184467440737.09551615 BTC
"
);
}
@ -933,7 +323,7 @@ mod tests {
Duration::from_secs(10),
determine_btc_to_swap(
true,
async { Ok(quote_with_min(0.1)) },
quote_with_min(0.1),
get_dummy_address(),
|| async { Ok(Amount::from_btc(0.21)?) },
|| async {
@ -951,10 +341,10 @@ mod tests {
assert_eq!(
writer.captured(),
r" INFO swap: Received quote price=0.00100000 BTC minimum_amount=0.10000000 BTC maximum_amount=184467440737.09551615 BTC
INFO swap: Deposit at least 0.10001000 BTC to cover the min quantity with fee!
INFO swap: Waiting for Bitcoin deposit deposit_address=1PdfytjS7C8wwd9Lq5o4x9aXA2YRqaCpH6 min_deposit=0.10001000 BTC max_giveable=0.00000000 BTC minimum_amount=0.10000000 BTC maximum_amount=184467440737.09551615 BTC
INFO swap: Received Bitcoin new_balance=0.21000000 BTC max_giveable=0.20000000 BTC
r" INFO swap::api::request: Received quote price=0.001 BTC minimum_amount=0.1 BTC maximum_amount=184467440737.09551615 BTC
INFO swap::api::request: Deposit at least 0.10001 BTC to cover the min quantity with fee!
INFO swap::api::request: Waiting for Bitcoin deposit deposit_address=1PdfytjS7C8wwd9Lq5o4x9aXA2YRqaCpH6 min_deposit=0.10001 BTC max_giveable=0 BTC minimum_amount=0.1 BTC maximum_amount=184467440737.09551615 BTC
INFO swap::api::request: Received Bitcoin new_balance=0.21 BTC max_giveable=0.2 BTC
"
);
}
@ -968,7 +358,7 @@ mod tests {
let determination_error = determine_btc_to_swap(
true,
async { Ok(quote_with_max(0.00)) },
quote_with_max(0.00),
get_dummy_address(),
|| async { Ok(Amount::from_btc(0.0101)?) },
|| async {

View File

@ -15,7 +15,7 @@ pub use crate::bitcoin::refund::TxRefund;
pub use crate::bitcoin::timelocks::{BlockHeight, ExpiredTimelocks};
pub use ::bitcoin::util::amount::Amount;
pub use ::bitcoin::util::psbt::PartiallySignedTransaction;
pub use ::bitcoin::{Address, Network, Transaction, Txid};
pub use ::bitcoin::{Address, AddressType, Network, Transaction, Txid};
pub use ecdsa_fun::adaptor::EncryptedSignature;
pub use ecdsa_fun::fun::Scalar;
pub use ecdsa_fun::Signature;
@ -244,10 +244,65 @@ pub fn current_epoch(
}
if tx_lock_status.is_confirmed_with(cancel_timelock) {
return ExpiredTimelocks::Cancel;
return ExpiredTimelocks::Cancel {
blocks_left: tx_cancel_status.blocks_left_until(punish_timelock),
};
}
ExpiredTimelocks::None
ExpiredTimelocks::None {
blocks_left: tx_lock_status.blocks_left_until(cancel_timelock),
}
}
pub mod bitcoin_address {
use anyhow::{bail, Result};
use serde::Serialize;
use std::str::FromStr;
#[derive(thiserror::Error, Debug, Clone, Copy, PartialEq, Serialize)]
#[error("Invalid Bitcoin address provided, expected address on network {expected:?} but address provided is on {actual:?}")]
pub struct BitcoinAddressNetworkMismatch {
#[serde(with = "crate::bitcoin::network")]
expected: bitcoin::Network,
#[serde(with = "crate::bitcoin::network")]
actual: bitcoin::Network,
}
pub fn parse(addr_str: &str) -> Result<bitcoin::Address> {
let address = bitcoin::Address::from_str(addr_str)?;
if address.address_type() != Some(bitcoin::AddressType::P2wpkh) {
anyhow::bail!("Invalid Bitcoin address provided, only bech32 format is supported!")
}
Ok(address)
}
pub fn validate(
address: bitcoin::Address,
expected_network: bitcoin::Network,
) -> Result<bitcoin::Address> {
if address.network != expected_network {
bail!(BitcoinAddressNetworkMismatch {
expected: expected_network,
actual: address.network
});
}
Ok(address)
}
pub fn validate_is_testnet(
address: bitcoin::Address,
is_testnet: bool,
) -> Result<bitcoin::Address> {
let expected_network = if is_testnet {
bitcoin::Network::Testnet
} else {
bitcoin::Network::Bitcoin
};
validate(address, expected_network)
}
}
/// Bitcoin error codes: https://github.com/bitcoin/bitcoin/blob/97d3500601c1d28642347d014a6de1e38f53ae4e/src/rpc/protocol.h#L23
@ -324,6 +379,7 @@ mod tests {
use crate::env::{GetConfig, Regtest};
use crate::protocol::{alice, bob};
use rand::rngs::OsRng;
use std::matches;
use uuid::Uuid;
#[test]
@ -338,7 +394,7 @@ mod tests {
tx_cancel_status,
);
assert_eq!(expired_timelock, ExpiredTimelocks::None)
assert!(matches!(expired_timelock, ExpiredTimelocks::None { .. }));
}
#[test]
@ -353,7 +409,7 @@ mod tests {
tx_cancel_status,
);
assert_eq!(expired_timelock, ExpiredTimelocks::Cancel)
assert!(matches!(expired_timelock, ExpiredTimelocks::Cancel { .. }));
}
#[test]

View File

@ -24,6 +24,12 @@ use std::ops::Add;
#[serde(transparent)]
pub struct CancelTimelock(u32);
impl From<CancelTimelock> for u32 {
fn from(cancel_timelock: CancelTimelock) -> Self {
cancel_timelock.0
}
}
impl CancelTimelock {
pub const fn new(number_of_blocks: u32) -> Self {
Self(number_of_blocks)
@ -64,6 +70,12 @@ impl fmt::Display for CancelTimelock {
#[serde(transparent)]
pub struct PunishTimelock(u32);
impl From<PunishTimelock> for u32 {
fn from(punish_timelock: PunishTimelock) -> Self {
punish_timelock.0
}
}
impl PunishTimelock {
pub const fn new(number_of_blocks: u32) -> Self {
Self(number_of_blocks)

View File

@ -4,9 +4,10 @@ use crate::bitcoin::{
};
use ::bitcoin::util::psbt::PartiallySignedTransaction;
use ::bitcoin::{OutPoint, TxIn, TxOut, Txid};
use anyhow::{bail, Result};
use anyhow::{bail, Context, Result};
use bdk::database::BatchDatabase;
use bdk::miniscript::Descriptor;
use bdk::psbt::PsbtUtils;
use bitcoin::{PackedLockTime, Script, Sequence};
use serde::{Deserialize, Serialize};
@ -100,6 +101,15 @@ impl TxLock {
Amount::from_sat(self.inner.clone().extract_tx().output[self.lock_output_vout()].value)
}
pub fn fee(&self) -> Result<Amount> {
Ok(Amount::from_sat(
self.inner
.clone()
.fee_amount()
.context("The PSBT is missing a TxOut for an input")?,
))
}
pub fn txid(&self) -> Txid {
self.inner.clone().extract_tx().txid()
}

View File

@ -37,9 +37,9 @@ impl Add<u32> for BlockHeight {
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Serialize, Debug, Clone, Copy, PartialEq, Eq)]
pub enum ExpiredTimelocks {
None,
Cancel,
None { blocks_left: u32 },
Cancel { blocks_left: u32 },
Punish,
}

View File

@ -24,6 +24,7 @@ use std::path::Path;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{watch, Mutex};
use tracing::{debug_span, Instrument};
const SLED_TREE_NAME: &str = "default_tree";
@ -192,7 +193,7 @@ impl Wallet {
tokio::time::sleep(Duration::from_secs(5)).await;
}
});
}.instrument(debug_span!("BitcoinWalletSubscription")));
Subscription {
receiver,
@ -274,7 +275,7 @@ impl Subscription {
pub async fn wait_until_confirmed_with<T>(&self, target: T) -> Result<()>
where
u32: PartialOrd<T>,
T: Into<u32>,
T: Copy,
{
self.wait_until(|status| status.is_confirmed_with(target))
@ -933,9 +934,20 @@ impl Confirmed {
pub fn meets_target<T>(&self, target: T) -> bool
where
u32: PartialOrd<T>,
T: Into<u32>,
{
self.confirmations() >= target
self.confirmations() >= target.into()
}
pub fn blocks_left_until<T>(&self, target: T) -> u32
where
T: Into<u32> + Copy,
{
if self.meets_target(target) {
0
} else {
target.into() - self.confirmations()
}
}
}
@ -948,7 +960,7 @@ impl ScriptStatus {
/// Check if the script has met the given confirmation target.
pub fn is_confirmed_with<T>(&self, target: T) -> bool
where
u32: PartialOrd<T>,
T: Into<u32>,
{
match self {
ScriptStatus::Confirmed(inner) => inner.meets_target(target),
@ -956,6 +968,17 @@ impl ScriptStatus {
}
}
// Calculate the number of blocks left until the target is met.
pub fn blocks_left_until<T>(&self, target: T) -> u32
where
T: Into<u32> + Copy,
{
match self {
ScriptStatus::Confirmed(inner) => inner.blocks_left_until(target),
_ => target.into(),
}
}
pub fn has_been_seen(&self) -> bool {
matches!(self, ScriptStatus::InMempool | ScriptStatus::Confirmed(_))
}
@ -987,7 +1010,7 @@ mod tests {
fn given_depth_0_should_meet_confirmation_target_one() {
let script = ScriptStatus::Confirmed(Confirmed { depth: 0 });
let confirmed = script.is_confirmed_with(1);
let confirmed = script.is_confirmed_with(1_u32);
assert!(confirmed)
}
@ -996,7 +1019,7 @@ mod tests {
fn given_confirmations_1_should_meet_confirmation_target_one() {
let script = ScriptStatus::from_confirmations(1);
let confirmed = script.is_confirmed_with(1);
let confirmed = script.is_confirmed_with(1_u32);
assert!(confirmed)
}
@ -1011,6 +1034,33 @@ mod tests {
assert_eq!(confirmed.depth, 0)
}
#[test]
fn given_depth_0_should_return_0_blocks_left_until_1() {
let script = ScriptStatus::Confirmed(Confirmed { depth: 0 });
let blocks_left = script.blocks_left_until(1_u32);
assert_eq!(blocks_left, 0)
}
#[test]
fn given_depth_1_should_return_0_blocks_left_until_1() {
let script = ScriptStatus::Confirmed(Confirmed { depth: 1 });
let blocks_left = script.blocks_left_until(1_u32);
assert_eq!(blocks_left, 0)
}
#[test]
fn given_depth_0_should_return_1_blocks_left_until_2() {
let script = ScriptStatus::Confirmed(Confirmed { depth: 0 });
let blocks_left = script.blocks_left_until(2_u32);
assert_eq!(blocks_left, 1)
}
#[test]
fn given_one_BTC_and_100k_sats_per_vb_fees_should_not_hit_max() {
// 400 weight = 100 vbyte

View File

@ -10,7 +10,7 @@ use uuid::Uuid;
pub async fn cancel_and_refund(
swap_id: Uuid,
bitcoin_wallet: Arc<Wallet>,
db: Arc<dyn Database>,
db: Arc<dyn Database + Send + Sync>,
) -> Result<BobState> {
if let Err(err) = cancel(swap_id, bitcoin_wallet.clone(), db.clone()).await {
tracing::info!(%err, "Could not submit cancel transaction");
@ -28,7 +28,7 @@ pub async fn cancel_and_refund(
pub async fn cancel(
swap_id: Uuid,
bitcoin_wallet: Arc<Wallet>,
db: Arc<dyn Database>,
db: Arc<dyn Database + Send + Sync>,
) -> Result<(Txid, Subscription, BobState)> {
let state = db.get_state(swap_id).await?.try_into()?;
@ -80,7 +80,7 @@ pub async fn cancel(
pub async fn refund(
swap_id: Uuid,
bitcoin_wallet: Arc<Wallet>,
db: Arc<dyn Database>,
db: Arc<dyn Database + Send + Sync>,
) -> Result<BobState> {
let state = db.get_state(swap_id).await?.try_into()?;

File diff suppressed because it is too large Load Diff

View File

@ -151,17 +151,17 @@ impl EventLoop {
return;
}
SwarmEvent::Behaviour(OutEvent::Failure { peer, error }) => {
tracing::warn!(%peer, "Communication error: {:#}", error);
tracing::warn!(%peer, err = %error, "Communication error");
return;
}
SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } if peer_id == self.alice_peer_id => {
tracing::info!("Connected to Alice at {}", endpoint.get_remote_address());
tracing::info!(peer_id = %endpoint.get_remote_address(), "Connected to Alice");
}
SwarmEvent::Dialing(peer_id) if peer_id == self.alice_peer_id => {
tracing::debug!("Dialling Alice at {}", peer_id);
tracing::debug!(%peer_id, "Dialling Alice");
}
SwarmEvent::ConnectionClosed { peer_id, endpoint, num_established, cause: Some(error) } if peer_id == self.alice_peer_id && num_established == 0 => {
tracing::warn!("Lost connection to Alice at {}, cause: {}", endpoint.get_remote_address(), error);
tracing::warn!(peer_id = %endpoint.get_remote_address(), cause = %error, "Lost connection to Alice");
}
SwarmEvent::ConnectionClosed { peer_id, num_established, cause: None, .. } if peer_id == self.alice_peer_id && num_established == 0 => {
// no error means the disconnection was requested
@ -169,10 +169,10 @@ impl EventLoop {
return;
}
SwarmEvent::OutgoingConnectionError { peer_id, error } if matches!(peer_id, Some(alice_peer_id) if alice_peer_id == self.alice_peer_id) => {
tracing::warn!( "Failed to dial Alice: {}", error);
tracing::warn!(%error, "Failed to dial Alice");
if let Some(duration) = self.swarm.behaviour_mut().redial.until_next_redial() {
tracing::info!("Next redial attempt in {}s", duration.as_secs());
tracing::info!(seconds_until_next_redial = %duration.as_secs(), "Waiting for next redial attempt");
}
}
@ -241,6 +241,7 @@ impl EventLoopHandle {
}
pub async fn request_quote(&mut self) -> Result<BidQuote> {
tracing::debug!("Requesting quote");
Ok(self.quote.send_receive(()).await?)
}

View File

@ -1,5 +1,4 @@
use anyhow::Result;
use std::option::Option::Some;
use std::path::Path;
use time::format_description::well_known::Rfc3339;
use tracing::subscriber::set_global_default;
@ -7,55 +6,32 @@ use tracing::{Event, Level, Subscriber};
use tracing_subscriber::fmt::format::{DefaultFields, Format, JsonFields};
use tracing_subscriber::fmt::time::UtcTime;
use tracing_subscriber::layer::{Context, SubscriberExt};
use tracing_subscriber::{fmt, EnvFilter, FmtSubscriber, Layer, Registry};
use uuid::Uuid;
use tracing_subscriber::{fmt, EnvFilter, Layer, Registry};
pub fn init(debug: bool, json: bool, dir: impl AsRef<Path>, swap_id: Option<Uuid>) -> Result<()> {
if let Some(swap_id) = swap_id {
let level_filter = EnvFilter::try_new("swap=debug")?;
pub fn init(debug: bool, json: bool, dir: impl AsRef<Path>) -> Result<()> {
let level_filter = EnvFilter::try_new("swap=debug")?;
let registry = Registry::default().with(level_filter);
let registry = Registry::default().with(level_filter);
let appender = tracing_appender::rolling::never(dir.as_ref(), "swap-all.log");
let (appender, _guard) = tracing_appender::non_blocking(appender);
let appender =
tracing_appender::rolling::never(dir.as_ref(), format!("swap-{}.log", swap_id));
let (appender, guard) = tracing_appender::non_blocking(appender);
let file_logger = registry.with(
fmt::layer()
.with_ansi(false)
.with_target(false)
.json()
.with_writer(appender),
);
std::mem::forget(guard);
let file_logger = registry.with(
fmt::layer()
.with_ansi(false)
.with_target(false)
.json()
.with_writer(appender),
);
if json && debug {
set_global_default(file_logger.with(debug_json_terminal_printer()))?;
} else if json && !debug {
set_global_default(file_logger.with(info_json_terminal_printer()))?;
} else if !json && debug {
set_global_default(file_logger.with(debug_terminal_printer()))?;
} else {
set_global_default(file_logger.with(info_terminal_printer()))?;
}
if json && debug {
set_global_default(file_logger.with(debug_json_terminal_printer()))?;
} else if json && !debug {
set_global_default(file_logger.with(info_json_terminal_printer()))?;
} else if !json && debug {
set_global_default(file_logger.with(debug_terminal_printer()))?;
} else {
let level = if debug { Level::DEBUG } else { Level::INFO };
let is_terminal = atty::is(atty::Stream::Stderr);
let builder = FmtSubscriber::builder()
.with_env_filter(format!("swap={}", level))
.with_writer(std::io::stderr)
.with_ansi(is_terminal)
.with_timer(UtcTime::rfc_3339())
.with_target(false);
if json {
builder.json().init();
} else {
builder.init();
}
};
set_global_default(file_logger.with(info_terminal_printer()))?;
}
tracing::info!("Logging initialized to {}", dir.as_ref().display());
Ok(())
@ -66,19 +42,11 @@ pub struct StdErrPrinter<L> {
level: Level,
}
type StdErrLayer<S, T> = tracing_subscriber::fmt::Layer<
S,
DefaultFields,
Format<tracing_subscriber::fmt::format::Full, T>,
fn() -> std::io::Stderr,
>;
type StdErrLayer<S, T> =
fmt::Layer<S, DefaultFields, Format<fmt::format::Full, T>, fn() -> std::io::Stderr>;
type StdErrJsonLayer<S, T> = tracing_subscriber::fmt::Layer<
S,
JsonFields,
Format<tracing_subscriber::fmt::format::Json, T>,
fn() -> std::io::Stderr,
>;
type StdErrJsonLayer<S, T> =
fmt::Layer<S, JsonFields, Format<fmt::format::Json, T>, fn() -> std::io::Stderr>;
fn debug_terminal_printer<S>() -> StdErrPrinter<StdErrLayer<S, UtcTime<Rfc3339>>> {
let is_terminal = atty::is(atty::Stream::Stderr);

View File

@ -1,11 +1,12 @@
use crate::database::Swap;
use crate::monero::Address;
use crate::protocol::{Database, State};
use anyhow::{Context, Result};
use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
use libp2p::{Multiaddr, PeerId};
use sqlx::sqlite::Sqlite;
use sqlx::{Pool, SqlitePool};
use std::collections::HashMap;
use std::path::Path;
use std::str::FromStr;
use time::OffsetDateTime;
@ -149,7 +150,7 @@ impl Database for SqliteDatabase {
let rows = sqlx::query!(
r#"
SELECT address
SELECT DISTINCT address
FROM peer_addresses
WHERE peer_id = ?
"#,
@ -169,6 +170,25 @@ impl Database for SqliteDatabase {
addresses
}
async fn get_swap_start_date(&self, swap_id: Uuid) -> Result<String> {
let mut conn = self.pool.acquire().await?;
let swap_id = swap_id.to_string();
let row = sqlx::query!(
r#"
SELECT min(entered_at) as start_date
FROM swap_states
WHERE swap_id = ?
"#,
swap_id
)
.fetch_one(&mut conn)
.await?;
row.start_date
.ok_or_else(|| anyhow!("Could not get swap start date"))
}
async fn insert_latest_state(&self, swap_id: Uuid, state: State) -> Result<()> {
let mut conn = self.pool.acquire().await?;
let entered_at = OffsetDateTime::now_utc();
@ -249,6 +269,69 @@ impl Database for SqliteDatabase {
result
}
async fn get_states(&self, swap_id: Uuid) -> Result<Vec<State>> {
let mut conn = self.pool.acquire().await?;
let swap_id = swap_id.to_string();
// TODO: We should use query! instead of query here to allow for at-compile-time validation
// I didn't manage to generate the mappings for the query! macro because of problems with sqlx-cli
let rows = sqlx::query!(
r#"
SELECT state
FROM swap_states
WHERE swap_id = ?
"#,
swap_id
)
.fetch_all(&mut conn)
.await?;
let result = rows
.iter()
.map(|row| {
let state_str: &str = &row.state;
let state = match serde_json::from_str::<Swap>(state_str) {
Ok(a) => Ok(State::from(a)),
Err(e) => Err(e),
}?;
Ok(state)
})
.collect::<Result<Vec<State>>>();
result
}
async fn raw_all(&self) -> Result<HashMap<Uuid, Vec<serde_json::Value>>> {
let mut conn = self.pool.acquire().await?;
let rows = sqlx::query!(
r#"
SELECT swap_id, state
FROM swap_states
"#
)
.fetch_all(&mut conn)
.await?;
let mut swaps: HashMap<Uuid, Vec<serde_json::Value>> = HashMap::new();
for row in &rows {
let swap_id = Uuid::from_str(&row.swap_id)?;
let state = serde_json::from_str(&row.state)?;
if let std::collections::hash_map::Entry::Vacant(e) = swaps.entry(swap_id) {
e.insert(vec![state]);
} else {
swaps
.get_mut(&swap_id)
.ok_or_else(|| anyhow!("Error while retrieving the swap"))?
.push(state);
}
}
Ok(swaps)
}
}
#[cfg(test)]

View File

@ -16,6 +16,7 @@
missing_copy_implementations
)]
pub mod api;
pub mod asb;
pub mod bitcoin;
pub mod cli;
@ -28,6 +29,7 @@ pub mod libp2p_ext;
pub mod monero;
pub mod network;
pub mod protocol;
pub mod rpc;
pub mod seed;
pub mod tor;
pub mod tracing_ext;

View File

@ -42,6 +42,13 @@ pub fn private_key_from_secp256k1_scalar(scalar: bitcoin::Scalar) -> PrivateKey
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct PrivateViewKey(#[serde(with = "monero_private_key")] PrivateKey);
impl fmt::Display for PrivateViewKey {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// Delegate to the Display implementation of PrivateKey
write!(f, "{}", self.0)
}
}
impl PrivateViewKey {
pub fn new_random<R: RngCore + CryptoRng>(rng: &mut R) -> Self {
let scalar = Scalar::random(rng);
@ -320,6 +327,52 @@ pub mod monero_amount {
}
}
pub mod monero_address {
use anyhow::{bail, Context, Result};
use std::str::FromStr;
#[derive(thiserror::Error, Debug, Clone, Copy, PartialEq)]
#[error("Invalid monero address provided, expected address on network {expected:?} but address provided is on {actual:?}")]
pub struct MoneroAddressNetworkMismatch {
pub expected: monero::Network,
pub actual: monero::Network,
}
pub fn parse(s: &str) -> Result<monero::Address> {
monero::Address::from_str(s).with_context(|| {
format!(
"Failed to parse {} as a monero address, please make sure it is a valid address",
s
)
})
}
pub fn validate(
address: monero::Address,
expected_network: monero::Network,
) -> Result<monero::Address> {
if address.network != expected_network {
bail!(MoneroAddressNetworkMismatch {
expected: expected_network,
actual: address.network,
});
}
Ok(address)
}
pub fn validate_is_testnet(
address: monero::Address,
is_testnet: bool,
) -> Result<monero::Address> {
let expected_network = if is_testnet {
monero::Network::Stagenet
} else {
monero::Network::Mainnet
};
validate(address, expected_network)
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -11,6 +11,7 @@ use serde::{Deserialize, Serialize};
use sha2::Sha256;
use sigma_fun::ext::dl_secp256k1_ed25519_eq::{CrossCurveDLEQ, CrossCurveDLEQProof};
use sigma_fun::HashTranscript;
use std::collections::HashMap;
use std::convert::TryInto;
use uuid::Uuid;
@ -139,7 +140,10 @@ pub trait Database {
async fn get_monero_address(&self, swap_id: Uuid) -> Result<monero::Address>;
async fn insert_address(&self, peer_id: PeerId, address: Multiaddr) -> Result<()>;
async fn get_addresses(&self, peer_id: PeerId) -> Result<Vec<Multiaddr>>;
async fn get_swap_start_date(&self, swap_id: Uuid) -> Result<String>;
async fn insert_latest_state(&self, swap_id: Uuid, state: State) -> Result<()>;
async fn get_state(&self, swap_id: Uuid) -> Result<State>;
async fn get_states(&self, swap_id: Uuid) -> Result<Vec<State>>;
async fn all(&self) -> Result<Vec<(Uuid, State)>>;
async fn raw_all(&self) -> Result<HashMap<Uuid, Vec<serde_json::Value>>>;
}

View File

@ -112,7 +112,7 @@ where
}
AliceState::BtcLocked { state3 } => {
match state3.expired_timelocks(bitcoin_wallet).await? {
ExpiredTimelocks::None => {
ExpiredTimelocks::None { .. } => {
// Record the current monero wallet block height so we don't have to scan from
// block 0 for scenarios where we create a refund wallet.
let monero_wallet_restore_blockheight = monero_wallet.block_height().await?;
@ -135,7 +135,7 @@ where
transfer_proof,
state3,
} => match state3.expired_timelocks(bitcoin_wallet).await? {
ExpiredTimelocks::None => {
ExpiredTimelocks::None { .. } => {
monero_wallet
.watch_for_transfer(state3.lock_xmr_watch_request(transfer_proof.clone(), 1))
.await
@ -221,7 +221,7 @@ where
encrypted_signature,
state3,
} => match state3.expired_timelocks(bitcoin_wallet).await? {
ExpiredTimelocks::None => {
ExpiredTimelocks::None { .. } => {
let tx_lock_status = bitcoin_wallet.subscribe_to(state3.tx_lock.clone()).await;
match state3.signed_redeem_transaction(*encrypted_signature) {
Ok(tx) => match bitcoin_wallet.broadcast(tx, "redeem").await {

View File

@ -21,9 +21,10 @@ use sigma_fun::ext::dl_secp256k1_ed25519_eq::CrossCurveDLEQProof;
use std::fmt;
use uuid::Uuid;
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Serialize)]
pub enum BobState {
Started {
#[serde(with = "::bitcoin::util::amount::serde::as_sat")]
btc_amount: bitcoin::Amount,
change_address: bitcoin::Address,
},
@ -287,13 +288,13 @@ pub struct State2 {
S_a_monero: monero::PublicKey,
S_a_bitcoin: bitcoin::PublicKey,
v: monero::PrivateViewKey,
xmr: monero::Amount,
cancel_timelock: CancelTimelock,
punish_timelock: PunishTimelock,
refund_address: bitcoin::Address,
pub xmr: monero::Amount,
pub cancel_timelock: CancelTimelock,
pub punish_timelock: PunishTimelock,
pub refund_address: bitcoin::Address,
redeem_address: bitcoin::Address,
punish_address: bitcoin::Address,
tx_lock: bitcoin::TxLock,
pub tx_lock: bitcoin::TxLock,
tx_cancel_sig_a: Signature,
tx_refund_encsig: bitcoin::EncryptedSignature,
min_monero_confirmations: u64,
@ -302,9 +303,9 @@ pub struct State2 {
#[serde(with = "::bitcoin::util::amount::serde::as_sat")]
tx_punish_fee: bitcoin::Amount,
#[serde(with = "::bitcoin::util::amount::serde::as_sat")]
tx_refund_fee: bitcoin::Amount,
pub tx_refund_fee: bitcoin::Amount,
#[serde(with = "::bitcoin::util::amount::serde::as_sat")]
tx_cancel_fee: bitcoin::Amount,
pub tx_cancel_fee: bitcoin::Amount,
}
impl State2 {
@ -439,7 +440,7 @@ impl State3 {
self.tx_lock.txid()
}
pub async fn current_epoch(
pub async fn expired_timelock(
&self,
bitcoin_wallet: &bitcoin::Wallet,
) -> Result<ExpiredTimelocks> {

View File

@ -117,7 +117,7 @@ async fn next_state(
} => {
let tx_lock_status = bitcoin_wallet.subscribe_to(state3.tx_lock.clone()).await;
if let ExpiredTimelocks::None = state3.current_epoch(bitcoin_wallet).await? {
if let ExpiredTimelocks::None { .. } = state3.expired_timelock(bitcoin_wallet).await? {
let transfer_proof_watcher = event_loop_handle.recv_transfer_proof();
let cancel_timelock_expires =
tx_lock_status.wait_until_confirmed_with(state3.cancel_timelock);
@ -156,7 +156,7 @@ async fn next_state(
} => {
let tx_lock_status = bitcoin_wallet.subscribe_to(state.tx_lock.clone()).await;
if let ExpiredTimelocks::None = state.current_epoch(bitcoin_wallet).await? {
if let ExpiredTimelocks::None { .. } = state.expired_timelock(bitcoin_wallet).await? {
let watch_request = state.lock_xmr_watch_request(lock_transfer_proof);
select! {
@ -185,7 +185,7 @@ async fn next_state(
BobState::XmrLocked(state) => {
let tx_lock_status = bitcoin_wallet.subscribe_to(state.tx_lock.clone()).await;
if let ExpiredTimelocks::None = state.expired_timelock(bitcoin_wallet).await? {
if let ExpiredTimelocks::None { .. } = state.expired_timelock(bitcoin_wallet).await? {
// Alice has locked Xmr
// Bob sends Alice his key
@ -209,7 +209,7 @@ async fn next_state(
BobState::EncSigSent(state) => {
let tx_lock_status = bitcoin_wallet.subscribe_to(state.tx_lock.clone()).await;
if let ExpiredTimelocks::None = state.expired_timelock(bitcoin_wallet).await? {
if let ExpiredTimelocks::None { .. } = state.expired_timelock(bitcoin_wallet).await? {
select! {
state5 = state.watch_for_redeem_btc(bitcoin_wallet) => {
BobState::BtcRedeemed(state5?)
@ -269,12 +269,12 @@ async fn next_state(
BobState::BtcCancelled(state) => {
// Bob has cancelled the swap
match state.expired_timelock(bitcoin_wallet).await? {
ExpiredTimelocks::None => {
ExpiredTimelocks::None { .. } => {
bail!(
"Internal error: canceled state reached before cancel timelock was expired"
);
}
ExpiredTimelocks::Cancel => {
ExpiredTimelocks::Cancel { .. } => {
state.publish_refund_btc(bitcoin_wallet).await?;
BobState::BtcRefunded(state)
}

31
swap/src/rpc.rs Normal file
View File

@ -0,0 +1,31 @@
use crate::api::Context;
use jsonrpsee::server::{RpcModule, ServerBuilder, ServerHandle};
use std::net::SocketAddr;
use std::sync::Arc;
use thiserror::Error;
pub mod methods;
#[derive(Debug, Error)]
pub enum Error {
#[error("Could not parse key value from params")]
ParseError,
}
pub async fn run_server(
server_address: SocketAddr,
context: Arc<Context>,
) -> anyhow::Result<(SocketAddr, ServerHandle)> {
let server = ServerBuilder::default().build(server_address).await?;
let mut modules = RpcModule::new(());
{
modules
.merge(methods::register_modules(Arc::clone(&context))?)
.expect("Could not register RPC modules")
}
let addr = server.local_addr()?;
let server_handle = server.start(modules)?;
Ok((addr, server_handle))
}

236
swap/src/rpc/methods.rs Normal file
View File

@ -0,0 +1,236 @@
use crate::api::request::{Method, Request};
use crate::api::Context;
use crate::bitcoin::bitcoin_address;
use crate::monero::monero_address;
use crate::{bitcoin, monero};
use anyhow::Result;
use jsonrpsee::server::RpcModule;
use jsonrpsee::types::Params;
use libp2p::core::Multiaddr;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use uuid::Uuid;
pub fn register_modules(context: Arc<Context>) -> Result<RpcModule<Arc<Context>>> {
let mut module = RpcModule::new(context);
module.register_async_method("suspend_current_swap", |params, context| async move {
execute_request(params, Method::SuspendCurrentSwap, &context).await
})?;
module.register_async_method("get_swap_info", |params_raw, context| async move {
let params: HashMap<String, serde_json::Value> = params_raw.parse()?;
let swap_id = params
.get("swap_id")
.ok_or_else(|| jsonrpsee_core::Error::Custom("Does not contain swap_id".to_string()))?;
let swap_id = as_uuid(swap_id)
.ok_or_else(|| jsonrpsee_core::Error::Custom("Could not parse swap_id".to_string()))?;
execute_request(params_raw, Method::GetSwapInfo { swap_id }, &context).await
})?;
module.register_async_method("get_bitcoin_balance", |params_raw, context| async move {
let params: HashMap<String, serde_json::Value> = params_raw.parse()?;
let force_refresh = params
.get("force_refresh")
.ok_or_else(|| {
jsonrpsee_core::Error::Custom("Does not contain force_refresh".to_string())
})?
.as_bool()
.ok_or_else(|| {
jsonrpsee_core::Error::Custom("force_refesh is not a boolean".to_string())
})?;
execute_request(params_raw, Method::Balance { force_refresh }, &context).await
})?;
module.register_async_method("get_history", |params, context| async move {
execute_request(params, Method::History, &context).await
})?;
module.register_async_method("get_raw_states", |params, context| async move {
execute_request(params, Method::GetRawStates, &context).await
})?;
module.register_async_method("resume_swap", |params_raw, context| async move {
let params: HashMap<String, serde_json::Value> = params_raw.parse()?;
let swap_id = params
.get("swap_id")
.ok_or_else(|| jsonrpsee_core::Error::Custom("Does not contain swap_id".to_string()))?;
let swap_id = as_uuid(swap_id)
.ok_or_else(|| jsonrpsee_core::Error::Custom("Could not parse swap_id".to_string()))?;
execute_request(params_raw, Method::Resume { swap_id }, &context).await
})?;
module.register_async_method("cancel_refund_swap", |params_raw, context| async move {
let params: HashMap<String, serde_json::Value> = params_raw.parse()?;
let swap_id = params
.get("swap_id")
.ok_or_else(|| jsonrpsee_core::Error::Custom("Does not contain swap_id".to_string()))?;
let swap_id = as_uuid(swap_id)
.ok_or_else(|| jsonrpsee_core::Error::Custom("Could not parse swap_id".to_string()))?;
execute_request(params_raw, Method::CancelAndRefund { swap_id }, &context).await
})?;
module.register_async_method(
"get_monero_recovery_info",
|params_raw, context| async move {
let params: HashMap<String, serde_json::Value> = params_raw.parse()?;
let swap_id = params.get("swap_id").ok_or_else(|| {
jsonrpsee_core::Error::Custom("Does not contain swap_id".to_string())
})?;
let swap_id = as_uuid(swap_id).ok_or_else(|| {
jsonrpsee_core::Error::Custom("Could not parse swap_id".to_string())
})?;
execute_request(params_raw, Method::MoneroRecovery { swap_id }, &context).await
},
)?;
module.register_async_method("withdraw_btc", |params_raw, context| async move {
let params: HashMap<String, String> = params_raw.parse()?;
let amount = if let Some(amount_str) = params.get("amount") {
Some(
::bitcoin::Amount::from_str_in(amount_str, ::bitcoin::Denomination::Bitcoin)
.map_err(|_| {
jsonrpsee_core::Error::Custom("Unable to parse amount".to_string())
})?,
)
} else {
None
};
let withdraw_address =
bitcoin::Address::from_str(params.get("address").ok_or_else(|| {
jsonrpsee_core::Error::Custom("Does not contain address".to_string())
})?)
.map_err(|err| jsonrpsee_core::Error::Custom(err.to_string()))?;
let withdraw_address =
bitcoin_address::validate(withdraw_address, context.config.env_config.bitcoin_network)?;
execute_request(
params_raw,
Method::WithdrawBtc {
amount,
address: withdraw_address,
},
&context,
)
.await
})?;
module.register_async_method("buy_xmr", |params_raw, context| async move {
let params: HashMap<String, String> = params_raw.parse()?;
let bitcoin_change_address =
bitcoin::Address::from_str(params.get("bitcoin_change_address").ok_or_else(|| {
jsonrpsee_core::Error::Custom("Does not contain bitcoin_change_address".to_string())
})?)
.map_err(|err| jsonrpsee_core::Error::Custom(err.to_string()))?;
let bitcoin_change_address = bitcoin_address::validate(
bitcoin_change_address,
context.config.env_config.bitcoin_network,
)?;
let monero_receive_address =
monero::Address::from_str(params.get("monero_receive_address").ok_or_else(|| {
jsonrpsee_core::Error::Custom("Does not contain monero_receiveaddress".to_string())
})?)
.map_err(|err| jsonrpsee_core::Error::Custom(err.to_string()))?;
let monero_receive_address = monero_address::validate(
monero_receive_address,
context.config.env_config.monero_network,
)?;
let seller =
Multiaddr::from_str(params.get("seller").ok_or_else(|| {
jsonrpsee_core::Error::Custom("Does not contain seller".to_string())
})?)
.map_err(|err| jsonrpsee_core::Error::Custom(err.to_string()))?;
execute_request(
params_raw,
Method::BuyXmr {
bitcoin_change_address,
monero_receive_address,
seller,
swap_id: Uuid::new_v4(),
},
&context,
)
.await
})?;
module.register_async_method("list_sellers", |params_raw, context| async move {
let params: HashMap<String, serde_json::Value> = params_raw.parse()?;
let rendezvous_point = params.get("rendezvous_point").ok_or_else(|| {
jsonrpsee_core::Error::Custom("Does not contain rendezvous_point".to_string())
})?;
let rendezvous_point = rendezvous_point
.as_str()
.and_then(|addr_str| Multiaddr::from_str(addr_str).ok())
.ok_or_else(|| {
jsonrpsee_core::Error::Custom("Could not parse valid multiaddr".to_string())
})?;
execute_request(
params_raw,
Method::ListSellers {
rendezvous_point: rendezvous_point.clone(),
},
&context,
)
.await
})?;
module.register_async_method("get_current_swap", |params, context| async move {
execute_request(params, Method::GetCurrentSwap, &context).await
})?;
Ok(module)
}
fn as_uuid(json_value: &serde_json::Value) -> Option<Uuid> {
if let Some(uuid_str) = json_value.as_str() {
Uuid::parse_str(uuid_str).ok()
} else {
None
}
}
async fn execute_request(
params: Params<'static>,
cmd: Method,
context: &Arc<Context>,
) -> Result<serde_json::Value, jsonrpsee_core::Error> {
// If we fail to parse the params as a String HashMap, it's most likely because its an empty object
// In that case, we want to make sure not to fail the request, so we set the log_reference_id to None
// and swallow the error
let reference_id = params
.parse::<HashMap<String, serde_json::Value>>()
.ok()
.and_then(|params_parsed| params_parsed.get("log_reference_id").cloned());
let request = Request::with_id(cmd, reference_id.map(|log_ref| log_ref.to_string()));
request
.call(Arc::clone(context))
.await
.map_err(|err| jsonrpsee_core::Error::Custom(format!("{:#}", err)))
}

View File

@ -16,7 +16,7 @@ use torut::onion::TorSecretKeyV3;
pub const SEED_LENGTH: usize = 32;
#[derive(Eq, PartialEq)]
#[derive(Clone, Eq, PartialEq)]
pub struct Seed([u8; SEED_LENGTH]);
impl Seed {

View File

@ -23,9 +23,9 @@ use swap::network::rendezvous::XmrBtcNamespace;
use swap::network::swarm;
use swap::protocol::alice::{AliceState, Swap};
use swap::protocol::bob::BobState;
use swap::protocol::{alice, bob};
use swap::protocol::{alice, bob, Database};
use swap::seed::Seed;
use swap::{asb, bitcoin, cli, env, monero};
use swap::{api, asb, bitcoin, cli, env, monero};
use tempfile::{tempdir, NamedTempFile};
use testcontainers::clients::Cli;
use testcontainers::{Container, RunnableImage};
@ -400,7 +400,7 @@ impl StartingBalances {
}
}
struct BobParams {
pub struct BobParams {
seed: Seed,
db_path: PathBuf,
bitcoin_wallet: Arc<bitcoin::Wallet>,
@ -411,6 +411,21 @@ struct BobParams {
}
impl BobParams {
pub fn get_concentenated_alice_address(&self) -> String {
format!(
"{}/p2p/{}",
self.alice_address.clone(),
self.alice_peer_id.clone().to_base58()
)
}
pub async fn get_change_receive_addresses(&self) -> (bitcoin::Address, monero::Address) {
(
self.bitcoin_wallet.new_address().await.unwrap(),
self.monero_wallet.get_main_address(),
)
}
pub async fn new_swap_from_db(&self, swap_id: Uuid) -> Result<(bob::Swap, cli::EventLoop)> {
let (event_loop, handle) = self.new_eventloop(swap_id).await?;
@ -452,6 +467,8 @@ impl BobParams {
}
let db = Arc::new(SqliteDatabase::open(&self.db_path).await?);
db.insert_peer_id(swap_id, self.alice_peer_id).await?;
let swap = bob::Swap::new(
db,
swap_id,
@ -525,13 +542,24 @@ pub struct TestContext {
alice_swap_handle: mpsc::Receiver<Swap>,
alice_handle: AliceApplicationHandle,
bob_params: BobParams,
pub bob_params: BobParams,
bob_starting_balances: StartingBalances,
bob_bitcoin_wallet: Arc<bitcoin::Wallet>,
bob_monero_wallet: Arc<monero::Wallet>,
}
impl TestContext {
pub async fn get_bob_context(self) -> api::Context {
api::Context::for_harness(
self.bob_params.seed,
self.env_config,
self.bob_params.db_path,
self.bob_bitcoin_wallet,
self.bob_monero_wallet,
)
.await
}
pub async fn restart_alice(&mut self) {
self.alice_handle.abort();

436
swap/tests/rpc.rs Normal file
View File

@ -0,0 +1,436 @@
pub mod harness;
#[cfg(test)]
mod test {
use anyhow::Result;
use jsonrpsee::ws_client::WsClientBuilder;
use jsonrpsee_core::client::{Client, ClientT};
use jsonrpsee_core::params::ObjectParams;
use serial_test::serial;
use serde_json::Value;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use swap::api::request::{Method, Request};
use swap::api::Context;
use crate::harness::alice_run_until::is_xmr_lock_transaction_sent;
use crate::harness::bob_run_until::is_btc_locked;
use crate::harness::{setup_test, SlowCancelConfig, TestContext};
use swap::asb::FixedRate;
use swap::protocol::{alice, bob};
use swap::tracing_ext::{capture_logs, MakeCapturingWriter};
use tracing_subscriber::filter::LevelFilter;
use uuid::Uuid;
const SERVER_ADDRESS: &str = "127.0.0.1:1234";
const SERVER_START_TIMEOUT_SECS: u64 = 50;
const BITCOIN_ADDR: &str = "bcrt1qahvhjfc7vx5857zf8knxs8yp5lkm26jgyt0k76";
const MONERO_ADDR: &str = "53gEuGZUhP9JMEBZoGaFNzhwEgiG7hwQdMCqFxiyiTeFPmkbt1mAoNybEUvYBKHcnrSgxnVWgZsTvRBaHBNXPa8tHiCU51a";
const SELLER: &str =
"/ip4/127.0.0.1/tcp/9939/p2p/12D3KooWCdMKjesXMJz1SiZ7HgotrxuqhQJbP5sgBm2BwP1cqThi";
const SWAP_ID: &str = "ea030832-3be9-454f-bb98-5ea9a788406b";
pub async fn setup_daemon(
harness_ctx: TestContext,
) -> (Client, MakeCapturingWriter, Arc<Context>) {
let writer = capture_logs(LevelFilter::DEBUG);
let server_address: SocketAddr = SERVER_ADDRESS.parse().unwrap();
let request = Request::new(Method::StartDaemon {
server_address: Some(server_address),
});
let context = Arc::new(harness_ctx.get_bob_context().await);
let context_clone = context.clone();
tokio::spawn(async move {
if let Err(err) = request.call(context_clone).await {
println!("Failed to initialize daemon for testing: {}", err);
}
});
for _ in 0..SERVER_START_TIMEOUT_SECS {
if writer.captured().contains("Started RPC server") {
let url = format!("ws://{}", SERVER_ADDRESS);
let client = WsClientBuilder::default().build(&url).await.unwrap();
return (client, writer, context);
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
panic!(
"Failed to start RPC server after {} seconds",
SERVER_START_TIMEOUT_SECS
);
}
fn assert_has_keys_serde(map: &serde_json::Map<String, Value>, keys: &[&str]) {
for &key in keys {
assert!(map.contains_key(key), "Key {} is missing", key);
}
}
// Helper function for HashMap
fn assert_has_keys_hashmap<T>(map: &HashMap<String, T>, keys: &[&str]) {
for &key in keys {
assert!(map.contains_key(key), "Key {} is missing", key);
}
}
#[tokio::test]
#[serial]
pub async fn get_swap_info() {
setup_test(SlowCancelConfig, |mut harness_ctx| async move {
// Start a swap and wait for xmr lock transaction to be published (XmrLockTransactionSent)
let (bob_swap, _) = harness_ctx.bob_swap().await;
let bob_swap_id = bob_swap.id;
tokio::spawn(bob::run_until(bob_swap, is_btc_locked));
let alice_swap = harness_ctx.alice_next_swap().await;
alice::run_until(
alice_swap,
is_xmr_lock_transaction_sent,
FixedRate::default(),
)
.await?;
let (client, _, _) = setup_daemon(harness_ctx).await;
let response: HashMap<String, Vec<(Uuid, String)>> = client
.request("get_history", ObjectParams::new())
.await
.unwrap();
let swaps: Vec<(Uuid, String)> = vec![(bob_swap_id, "btc is locked".to_string())];
assert_eq!(response, HashMap::from([("swaps".to_string(), swaps)]));
let response: HashMap<String, HashMap<Uuid, Vec<Value>>> = client
.request("get_raw_states", ObjectParams::new())
.await
.unwrap();
let response_raw_states = response.get("raw_states").unwrap();
assert!(response_raw_states.contains_key(&bob_swap_id));
assert_eq!(response_raw_states.get(&bob_swap_id).unwrap().len(), 2);
let mut params = ObjectParams::new();
params.insert("swap_id", bob_swap_id).unwrap();
let response: HashMap<String, Value> =
client.request("get_swap_info", params).await.unwrap();
// Check primary keys in response
assert_has_keys_hashmap(
&response,
&[
"txRefundFee",
"swapId",
"cancelTimelock",
"timelock",
"punishTimelock",
"stateName",
"btcAmount",
"startDate",
"btcRefundAddress",
"txCancelFee",
"xmrAmount",
"completed",
"txLockId",
"seller",
],
);
// Assert specific fields
assert_eq!(response.get("swapId").unwrap(), &bob_swap_id.to_string());
assert_eq!(
response.get("stateName").unwrap(),
&"btc is locked".to_string()
);
assert_eq!(response.get("completed").unwrap(), &Value::Bool(false));
// Check seller object and its keys
let seller = response
.get("seller")
.expect("Field 'seller' is missing from response")
.as_object()
.expect("'seller' is not an object");
assert_has_keys_serde(seller, &["peerId"]);
// Check timelock object, nested 'None' object, and blocks_left
let timelock = response
.get("timelock")
.expect("Field 'timelock' is missing from response")
.as_object()
.expect("'timelock' is not an object");
let none_obj = timelock
.get("None")
.expect("Field 'None' is missing from 'timelock'")
.as_object()
.expect("'None' is not an object in 'timelock'");
let blocks_left = none_obj
.get("blocks_left")
.expect("Field 'blocks_left' is missing from 'None'")
.as_i64()
.expect("'blocks_left' is not an integer");
// Validate blocks_left
assert!(
blocks_left > 0 && blocks_left <= 180,
"Field 'blocks_left' should be > 0 and <= 180 but got {}",
blocks_left
);
Ok(())
})
.await;
}
#[tokio::test]
#[serial]
pub async fn test_rpc_calls() {
setup_test(SlowCancelConfig, |harness_ctx| async move {
let alice_addr = harness_ctx.bob_params.get_concentenated_alice_address();
let (change_address, receive_address) =
harness_ctx.bob_params.get_change_receive_addresses().await;
let (client, writer, _) = setup_daemon(harness_ctx).await;
assert!(client.is_connected());
let mut params = ObjectParams::new();
params.insert("force_refresh", false).unwrap();
let response: HashMap<String, i32> = client
.request("get_bitcoin_balance", params)
.await
.unwrap();
assert_eq!(response, HashMap::from([("balance".to_string(), 10000000)]));
let mut params = ObjectParams::new();
params.insert("log_reference_id", "test_ref_id").unwrap();
params.insert("force_refresh", false).unwrap();
let _: HashMap<String, i32> = client.request("get_bitcoin_balance", params).await.unwrap();
assert!(writer.captured().contains(
r#"method{method_name="Balance" log_reference_id="\"test_ref_id\""}: swap::api::request: Current Bitcoin balance as of last sync balance=0.1 BTC"#
));
for method in ["get_swap_info", "resume_swap", "cancel_refund_swap"].iter() {
let mut params = ObjectParams::new();
params.insert("swap_id", "invalid_swap").unwrap();
let response: Result<HashMap<String, String>, _> =
client.request(method, params).await;
response.expect_err(&format!(
"Expected an error when swap_id is invalid for method {}",
method
));
let params = ObjectParams::new();
let response: Result<HashMap<String, String>, _> =
client.request(method, params).await;
response.expect_err(&format!(
"Expected an error when swap_id is missing for method {}",
method
));
}
let params = ObjectParams::new();
let result: Result<HashMap<String, String>, _> =
client.request("list_sellers", params).await;
result.expect_err("Expected an error when rendezvous_point is missing");
let params = ObjectParams::new();
let result: Result<HashMap<String, String>, _> =
client.request("list_sellers", params).await;
result.expect_err("Expected an error when rendezvous_point is missing");
let params = ObjectParams::new();
let response: Result<HashMap<String, String>, _> =
client.request("withdraw_btc", params).await;
response.expect_err("Expected an error when withdraw_address is missing");
let mut params = ObjectParams::new();
params.insert("address", "invalid_address").unwrap();
let response: Result<HashMap<String, String>, _> =
client.request("withdraw_btc", params).await;
response.expect_err("Expected an error when withdraw_address is malformed");
let mut params = ObjectParams::new();
params.insert("address", BITCOIN_ADDR).unwrap();
params.insert("amount", "0").unwrap();
let response: Result<HashMap<String, String>, _> =
client.request("withdraw_btc", params).await;
response.expect_err("Expected an error when amount is 0");
let mut params = ObjectParams::new();
params
.insert("address", BITCOIN_ADDR)
.unwrap();
params.insert("amount", "0.01").unwrap();
let response: HashMap<String, Value> = client
.request("withdraw_btc", params)
.await
.expect("Expected a valid response");
assert_has_keys_hashmap(&response, &["signed_tx", "amount", "txid"]);
assert_eq!(
response.get("amount").unwrap().as_u64().unwrap(),
1_000_000
);
let params = ObjectParams::new();
let response: Result<HashMap<String, String>, _> =
client.request("buy_xmr", params).await;
response.expect_err("Expected an error when no params are given");
let mut params = ObjectParams::new();
params
.insert("bitcoin_change_address", BITCOIN_ADDR)
.unwrap();
params
.insert("monero_receive_address", MONERO_ADDR)
.unwrap();
let response: Result<HashMap<String, String>, _> =
client.request("buy_xmr", params).await;
response.expect_err("Expected an error when seller is missing");
let mut params = ObjectParams::new();
params
.insert("bitcoin_change_address", BITCOIN_ADDR)
.unwrap();
params.insert("seller", SELLER).unwrap();
let response: Result<HashMap<String, String>, _> =
client.request("buy_xmr", params).await;
response.expect_err("Expected an error when monero_receive_address is missing");
let mut params = ObjectParams::new();
params
.insert("monero_receive_address", MONERO_ADDR)
.unwrap();
params.insert("seller", SELLER).unwrap();
let response: Result<HashMap<String, String>, _> =
client.request("buy_xmr", params).await;
response.expect_err("Expected an error when bitcoin_change_address is missing");
let mut params = ObjectParams::new();
params
.insert("bitcoin_change_address", "invalid_address")
.unwrap();
params
.insert("monero_receive_address", MONERO_ADDR)
.unwrap();
params.insert("seller", SELLER).unwrap();
let response: Result<HashMap<String, String>, _> =
client.request("buy_xmr", params).await;
response.expect_err("Expected an error when bitcoin_change_address is malformed");
let mut params = ObjectParams::new();
params
.insert("bitcoin_change_address", BITCOIN_ADDR)
.unwrap();
params
.insert("monero_receive_address", "invalid_address")
.unwrap();
params.insert("seller", SELLER).unwrap();
let response: Result<HashMap<String, String>, _> =
client.request("buy_xmr", params).await;
response.expect_err("Expected an error when monero_receive_address is malformed");
let mut params = ObjectParams::new();
params
.insert("bitcoin_change_address", BITCOIN_ADDR)
.unwrap();
params
.insert("monero_receive_address", MONERO_ADDR)
.unwrap();
params.insert("seller", "invalid_seller").unwrap();
let response: Result<HashMap<String, String>, _> =
client.request("buy_xmr", params).await;
response.expect_err("Expected an error when seller is malformed");
let response: Result<HashMap<String, String>, _> = client
.request("suspend_current_swap", ObjectParams::new())
.await;
response.expect_err("Expected an error when no swap is running");
let mut params = ObjectParams::new();
params
.insert("bitcoin_change_address", change_address)
.unwrap();
params
.insert("monero_receive_address", receive_address)
.unwrap();
params.insert("seller", alice_addr).unwrap();
let response: HashMap<String, Value> = client
.request("buy_xmr", params)
.await
.expect("Expected a HashMap, got an error");
assert_has_keys_hashmap(&response, &["swapId"]);
Ok(())
})
.await;
}
#[tokio::test]
#[serial]
pub async fn suspend_current_swap_swap_running() {
setup_test(SlowCancelConfig, |harness_ctx| async move {
let (client, _, ctx) = setup_daemon(harness_ctx).await;
ctx.swap_lock
.acquire_swap_lock(Uuid::parse_str(SWAP_ID).unwrap())
.await
.unwrap();
let cloned_ctx = ctx.clone();
tokio::spawn(async move {
// Immediately release lock when suspend signal is received. Mocks a running swap that is then cancelled.
ctx.swap_lock
.listen_for_swap_force_suspension()
.await
.unwrap();
ctx.swap_lock.release_swap_lock().await.unwrap();
});
let response: HashMap<String, String> = client
.request("suspend_current_swap", ObjectParams::new())
.await
.unwrap();
assert_eq!(
response,
HashMap::from([("swapId".to_string(), SWAP_ID.to_string())])
);
cloned_ctx
.swap_lock
.acquire_swap_lock(Uuid::parse_str(SWAP_ID).unwrap())
.await
.unwrap();
let response: Result<HashMap<String, String>, _> = client
.request("suspend_current_swap", ObjectParams::new())
.await;
response.expect_err("Expected an error when suspend signal times out");
Ok(())
})
.await;
}
}