mirror of
https://github.com/comit-network/xmr-btc-swap.git
synced 2025-08-08 22:42:35 -04:00
Add the list-sellers
command to the CLI
This command uses a rendezvous node to find sellers (i.e. ASBs) and query them for quotes. Sellers, that can be dialed and queried for a quote will be listed.
This commit is contained in:
parent
f45cde84ab
commit
ff10edd8a4
13 changed files with 800 additions and 247 deletions
|
@ -1,5 +1,6 @@
|
|||
use crate::env::GetConfig;
|
||||
use crate::fs::system_data_dir;
|
||||
use crate::network::rendezvous::{XmrBtcNamespace, DEFAULT_RENDEZVOUS_ADDRESS};
|
||||
use crate::{env, monero};
|
||||
use anyhow::{Context, Result};
|
||||
use libp2p::core::Multiaddr;
|
||||
|
@ -188,6 +189,20 @@ where
|
|||
bitcoin_target_block: bitcoin_target_block_from(bitcoin_target_block, is_testnet),
|
||||
},
|
||||
},
|
||||
RawCommand::ListSellers {
|
||||
rendezvous_node_addr,
|
||||
tor: Tor { tor_socks5_port },
|
||||
} => Arguments {
|
||||
env_config: env_config_from(is_testnet),
|
||||
debug,
|
||||
json,
|
||||
data_dir: data::data_dir_from(data, is_testnet)?,
|
||||
cmd: Command::ListSellers {
|
||||
rendezvous_node_addr,
|
||||
namespace: rendezvous_namespace_from(is_testnet),
|
||||
tor_socks5_port,
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
Ok(ParseResult::Arguments(arguments))
|
||||
|
@ -224,6 +239,11 @@ pub enum Command {
|
|||
bitcoin_electrum_rpc_url: Url,
|
||||
bitcoin_target_block: usize,
|
||||
},
|
||||
ListSellers {
|
||||
rendezvous_node_addr: Multiaddr,
|
||||
namespace: XmrBtcNamespace,
|
||||
tor_socks5_port: u16,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(structopt::StructOpt, Debug)]
|
||||
|
@ -311,6 +331,17 @@ pub enum RawCommand {
|
|||
#[structopt(flatten)]
|
||||
bitcoin: Bitcoin,
|
||||
},
|
||||
ListSellers {
|
||||
#[structopt(
|
||||
long,
|
||||
help = "The multiaddr (including peer-id) of a rendezvous node that sellers register with",
|
||||
default_value = DEFAULT_RENDEZVOUS_ADDRESS
|
||||
)]
|
||||
rendezvous_node_addr: Multiaddr,
|
||||
|
||||
#[structopt(flatten)]
|
||||
tor: Tor,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(structopt::StructOpt, Debug)]
|
||||
|
@ -406,6 +437,14 @@ fn bitcoin_electrum_rpc_url_from(url: Option<Url>, testnet: bool) -> Result<Url>
|
|||
}
|
||||
}
|
||||
|
||||
fn rendezvous_namespace_from(is_testnet: bool) -> XmrBtcNamespace {
|
||||
if is_testnet {
|
||||
XmrBtcNamespace::Testnet
|
||||
} else {
|
||||
XmrBtcNamespace::Mainnet
|
||||
}
|
||||
}
|
||||
|
||||
fn bitcoin_target_block_from(target_block: Option<usize>, testnet: bool) -> usize {
|
||||
if let Some(target_block) = target_block {
|
||||
target_block
|
||||
|
|
|
@ -94,7 +94,7 @@ impl EventLoop {
|
|||
loop {
|
||||
// Note: We are making very elaborate use of `select!` macro's feature here. Make sure to read the documentation thoroughly: https://docs.rs/tokio/1.4.0/tokio/macro.select.html
|
||||
tokio::select! {
|
||||
swarm_event = self.swarm.next_event().fuse() => {
|
||||
swarm_event = self.swarm.select_next_some() => {
|
||||
match swarm_event {
|
||||
SwarmEvent::Behaviour(OutEvent::QuoteReceived { id, response }) => {
|
||||
if let Some(responder) = self.inflight_quote_requests.remove(&id) {
|
||||
|
|
273
swap/src/cli/list_sellers.rs
Normal file
273
swap/src/cli/list_sellers.rs
Normal file
|
@ -0,0 +1,273 @@
|
|||
use crate::network::quote::BidQuote;
|
||||
use crate::network::rendezvous::XmrBtcNamespace;
|
||||
use crate::network::{quote, swarm};
|
||||
use anyhow::Result;
|
||||
use futures::StreamExt;
|
||||
use libp2p::multiaddr::Protocol;
|
||||
use libp2p::ping::{Ping, PingConfig, PingEvent};
|
||||
use libp2p::rendezvous::{Namespace, Rendezvous};
|
||||
use libp2p::request_response::{RequestResponseEvent, RequestResponseMessage};
|
||||
use libp2p::swarm::SwarmEvent;
|
||||
use libp2p::{identity, rendezvous, Multiaddr, PeerId, Swarm};
|
||||
use serde::Serialize;
|
||||
use serde_with::{serde_as, DisplayFromStr};
|
||||
use std::collections::HashMap;
|
||||
use std::time::Duration;
|
||||
|
||||
pub async fn list_sellers(
|
||||
rendezvous_node_peer_id: PeerId,
|
||||
rendezvous_node_addr: Multiaddr,
|
||||
namespace: XmrBtcNamespace,
|
||||
tor_socks5_port: u16,
|
||||
identity: identity::Keypair,
|
||||
) -> Result<Vec<Seller>> {
|
||||
let behaviour = Behaviour {
|
||||
rendezvous: Rendezvous::new(identity.clone(), rendezvous::Config::default()),
|
||||
quote: quote::cli(),
|
||||
ping: Ping::new(
|
||||
PingConfig::new()
|
||||
.with_keep_alive(false)
|
||||
.with_interval(Duration::from_secs(86_400)),
|
||||
),
|
||||
};
|
||||
let mut swarm = swarm::cli(identity, tor_socks5_port, behaviour).await?;
|
||||
|
||||
let _ = swarm.dial_addr(rendezvous_node_addr.clone());
|
||||
|
||||
let event_loop = EventLoop::new(
|
||||
swarm,
|
||||
rendezvous_node_peer_id,
|
||||
rendezvous_node_addr,
|
||||
namespace,
|
||||
);
|
||||
let sellers = event_loop.run().await;
|
||||
|
||||
Ok(sellers)
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct Seller {
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub multiaddr: Multiaddr,
|
||||
pub quote: BidQuote,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum OutEvent {
|
||||
Rendezvous(rendezvous::Event),
|
||||
Quote(quote::OutEvent),
|
||||
Ping(PingEvent),
|
||||
}
|
||||
|
||||
impl From<rendezvous::Event> for OutEvent {
|
||||
fn from(event: rendezvous::Event) -> Self {
|
||||
OutEvent::Rendezvous(event)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<quote::OutEvent> for OutEvent {
|
||||
fn from(event: quote::OutEvent) -> Self {
|
||||
OutEvent::Quote(event)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(libp2p::NetworkBehaviour)]
|
||||
#[behaviour(event_process = false)]
|
||||
#[behaviour(out_event = "OutEvent")]
|
||||
struct Behaviour {
|
||||
rendezvous: Rendezvous,
|
||||
quote: quote::Behaviour,
|
||||
ping: Ping,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum QuoteStatus {
|
||||
Pending,
|
||||
Received(BidQuote),
|
||||
}
|
||||
|
||||
enum State {
|
||||
WaitForDiscovery,
|
||||
WaitForQuoteCompletion,
|
||||
}
|
||||
|
||||
struct EventLoop {
|
||||
swarm: Swarm<Behaviour>,
|
||||
rendezvous_peer_id: PeerId,
|
||||
rendezvous_addr: Multiaddr,
|
||||
namespace: XmrBtcNamespace,
|
||||
asb_address: HashMap<PeerId, Multiaddr>,
|
||||
asb_quote_status: HashMap<PeerId, QuoteStatus>,
|
||||
state: State,
|
||||
}
|
||||
|
||||
impl EventLoop {
|
||||
fn new(
|
||||
swarm: Swarm<Behaviour>,
|
||||
rendezvous_peer_id: PeerId,
|
||||
rendezvous_addr: Multiaddr,
|
||||
namespace: XmrBtcNamespace,
|
||||
) -> Self {
|
||||
Self {
|
||||
swarm,
|
||||
rendezvous_peer_id,
|
||||
rendezvous_addr,
|
||||
namespace,
|
||||
asb_address: Default::default(),
|
||||
asb_quote_status: Default::default(),
|
||||
state: State::WaitForDiscovery,
|
||||
}
|
||||
}
|
||||
|
||||
async fn run(mut self) -> Vec<Seller> {
|
||||
loop {
|
||||
tokio::select! {
|
||||
swarm_event = self.swarm.select_next_some() => {
|
||||
match swarm_event {
|
||||
SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } => {
|
||||
if peer_id == self.rendezvous_peer_id{
|
||||
tracing::info!(
|
||||
"Connected to rendezvous point, discovering nodes in '{}' namespace ...",
|
||||
self.namespace
|
||||
);
|
||||
|
||||
self.swarm.behaviour_mut().rendezvous.discover(
|
||||
Some(Namespace::new(self.namespace.to_string()).expect("our namespace to be a correct string")),
|
||||
None,
|
||||
None,
|
||||
self.rendezvous_peer_id,
|
||||
);
|
||||
} else {
|
||||
let address = endpoint.get_remote_address();
|
||||
self.asb_address.insert(peer_id, address.clone());
|
||||
}
|
||||
}
|
||||
SwarmEvent::UnreachableAddr { peer_id, error, address, .. } => {
|
||||
if address == self.rendezvous_addr {
|
||||
tracing::error!(
|
||||
"Failed to connect to rendezvous point at {}: {}",
|
||||
address,
|
||||
error
|
||||
);
|
||||
|
||||
// if the rendezvous node is unreachable we just stop
|
||||
return Vec::new();
|
||||
} else {
|
||||
tracing::debug!(
|
||||
"Failed to connect to peer at {}: {}",
|
||||
address,
|
||||
error
|
||||
);
|
||||
|
||||
// if a different peer than the rendezvous node is unreachable (i.e. a seller) we remove that seller from the quote status state
|
||||
self.asb_quote_status.remove(&peer_id);
|
||||
}
|
||||
}
|
||||
SwarmEvent::Behaviour(OutEvent::Rendezvous(
|
||||
rendezvous::Event::Discovered { registrations, .. },
|
||||
)) => {
|
||||
self.state = State::WaitForQuoteCompletion;
|
||||
|
||||
for registration in registrations {
|
||||
let peer = registration.record.peer_id();
|
||||
for address in registration.record.addresses() {
|
||||
tracing::info!("Discovered peer {} at {}", peer, address);
|
||||
|
||||
let p2p_suffix = Protocol::P2p(*peer.as_ref());
|
||||
let _address_with_p2p = if !address
|
||||
.ends_with(&Multiaddr::empty().with(p2p_suffix.clone()))
|
||||
{
|
||||
address.clone().with(p2p_suffix)
|
||||
} else {
|
||||
address.clone()
|
||||
};
|
||||
|
||||
self.asb_quote_status.insert(peer, QuoteStatus::Pending);
|
||||
|
||||
// add all external addresses of that peer to the quote behaviour
|
||||
self.swarm.behaviour_mut().quote.add_address(&peer, address.clone());
|
||||
}
|
||||
|
||||
// request the quote, if we are not connected to the peer it will be dialed automatically
|
||||
let _request_id = self.swarm.behaviour_mut().quote.send_request(&peer, ());
|
||||
}
|
||||
}
|
||||
SwarmEvent::Behaviour(OutEvent::Quote(quote_response)) => {
|
||||
match quote_response {
|
||||
RequestResponseEvent::Message { peer, message } => {
|
||||
match message {
|
||||
RequestResponseMessage::Response { response, .. } => {
|
||||
if self.asb_quote_status.insert(peer, QuoteStatus::Received(response)).is_none() {
|
||||
tracing::error!(%peer, "Received bid quote from unexpected peer, this record will be removed!");
|
||||
self.asb_quote_status.remove(&peer);
|
||||
}
|
||||
}
|
||||
RequestResponseMessage::Request { .. } => unreachable!()
|
||||
}
|
||||
}
|
||||
RequestResponseEvent::OutboundFailure { peer, error, .. } => {
|
||||
if peer == self.rendezvous_peer_id {
|
||||
tracing::debug!(%peer, "Outbound failure when communicating with rendezvous node: {:#}", error);
|
||||
} else {
|
||||
tracing::debug!(%peer, "Ignoring seller, because unable to request quote: {:#}", error);
|
||||
self.asb_quote_status.remove(&peer);
|
||||
}
|
||||
}
|
||||
RequestResponseEvent::InboundFailure { peer, error, .. } => {
|
||||
if peer == self.rendezvous_peer_id {
|
||||
tracing::debug!(%peer, "Inbound failure when communicating with rendezvous node: {:#}", error);
|
||||
} else {
|
||||
tracing::debug!(%peer, "Ignoring seller, because unable to request quote: {:#}", error);
|
||||
self.asb_quote_status.remove(&peer);
|
||||
}
|
||||
},
|
||||
RequestResponseEvent::ResponseSent { .. } => unreachable!()
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
match self.state {
|
||||
State::WaitForDiscovery => {
|
||||
continue;
|
||||
}
|
||||
State::WaitForQuoteCompletion => {
|
||||
let all_quotes_fetched = self
|
||||
.asb_quote_status
|
||||
.iter()
|
||||
.map(|(peer_id, quote_status)| match quote_status {
|
||||
QuoteStatus::Pending => Err(StillPending {}),
|
||||
QuoteStatus::Received(quote) => {
|
||||
let address = self
|
||||
.asb_address
|
||||
.get(&peer_id)
|
||||
.expect("if we got a quote we must have stored an address");
|
||||
|
||||
Ok(Seller {
|
||||
multiaddr: address.clone(),
|
||||
quote: *quote,
|
||||
})
|
||||
}
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>();
|
||||
|
||||
match all_quotes_fetched {
|
||||
Ok(sellers) => break sellers,
|
||||
Err(StillPending {}) => continue,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct StillPending {}
|
||||
|
||||
impl From<PingEvent> for OutEvent {
|
||||
fn from(event: PingEvent) -> Self {
|
||||
OutEvent::Ping(event)
|
||||
}
|
||||
}
|
|
@ -1,4 +1,5 @@
|
|||
use anyhow::Result;
|
||||
use std::option::Option::Some;
|
||||
use std::path::Path;
|
||||
use tracing::subscriber::set_global_default;
|
||||
use tracing::{Event, Level, Subscriber};
|
||||
|
@ -8,7 +9,7 @@ use tracing_subscriber::layer::{Context, SubscriberExt};
|
|||
use tracing_subscriber::{fmt, EnvFilter, FmtSubscriber, Layer, Registry};
|
||||
use uuid::Uuid;
|
||||
|
||||
pub fn init(debug: bool, json: bool, dir: impl AsRef<Path>, swap_id: Uuid) -> Result<()> {
|
||||
pub fn init(debug: bool, json: bool, dir: impl AsRef<Path>, swap_id: Option<Uuid>) -> Result<()> {
|
||||
if json {
|
||||
let level = if debug { Level::DEBUG } else { Level::INFO };
|
||||
|
||||
|
@ -24,7 +25,7 @@ pub fn init(debug: bool, json: bool, dir: impl AsRef<Path>, swap_id: Uuid) -> Re
|
|||
.init();
|
||||
|
||||
Ok(())
|
||||
} else {
|
||||
} else if let Some(swap_id) = swap_id {
|
||||
let level_filter = EnvFilter::try_new("swap=debug")?;
|
||||
|
||||
let registry = Registry::default().with(level_filter);
|
||||
|
@ -45,6 +46,19 @@ pub fn init(debug: bool, json: bool, dir: impl AsRef<Path>, swap_id: Uuid) -> Re
|
|||
set_global_default(registry.with(file_logger).with(info_terminal_printer()))?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
} else {
|
||||
let level = if debug { Level::DEBUG } else { Level::INFO };
|
||||
let is_terminal = atty::is(atty::Stream::Stderr);
|
||||
|
||||
FmtSubscriber::builder()
|
||||
.with_env_filter(format!("swap={}", level))
|
||||
.with_writer(std::io::stderr)
|
||||
.with_ansi(is_terminal)
|
||||
.with_timer(ChronoLocal::with_format("%F %T".to_owned()))
|
||||
.with_target(false)
|
||||
.init();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue