mirror of
https://github.com/comit-network/xmr-btc-swap.git
synced 2025-12-19 10:38:03 -05:00
refactor(monero-rpc-pool): use u16 for ports
This commit is contained in:
parent
35414d15af
commit
fe5e58a773
4 changed files with 38 additions and 43 deletions
|
|
@ -38,7 +38,7 @@ use axum::body::Body;
|
||||||
use tokio::sync::{Mutex, OwnedMutexGuard, RwLock};
|
use tokio::sync::{Mutex, OwnedMutexGuard, RwLock};
|
||||||
|
|
||||||
/// Key for the map – `(scheme, host, port, via_tor)`.
|
/// Key for the map – `(scheme, host, port, via_tor)`.
|
||||||
pub type StreamKey = (String, String, i64, bool);
|
pub type StreamKey = (String, String, u16, bool);
|
||||||
|
|
||||||
/// Alias for hyper's HTTP/1 sender.
|
/// Alias for hyper's HTTP/1 sender.
|
||||||
pub type HttpSender = hyper::client::conn::http1::SendRequest<Body>;
|
pub type HttpSender = hyper::client::conn::http1::SendRequest<Body>;
|
||||||
|
|
|
||||||
|
|
@ -66,7 +66,7 @@ impl Database {
|
||||||
&self,
|
&self,
|
||||||
scheme: &str,
|
scheme: &str,
|
||||||
host: &str,
|
host: &str,
|
||||||
port: i64,
|
port: u16,
|
||||||
was_successful: bool,
|
was_successful: bool,
|
||||||
latency_ms: Option<f64>,
|
latency_ms: Option<f64>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
|
|
|
||||||
|
|
@ -122,7 +122,7 @@ impl NodePool {
|
||||||
&self,
|
&self,
|
||||||
scheme: &str,
|
scheme: &str,
|
||||||
host: &str,
|
host: &str,
|
||||||
port: i64,
|
port: u16,
|
||||||
latency_ms: f64,
|
latency_ms: f64,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
self.db
|
self.db
|
||||||
|
|
@ -131,7 +131,7 @@ impl NodePool {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn record_failure(&self, scheme: &str, host: &str, port: i64) -> Result<()> {
|
pub async fn record_failure(&self, scheme: &str, host: &str, port: u16) -> Result<()> {
|
||||||
self.db
|
self.db
|
||||||
.record_health_check(scheme, host, port, false, None)
|
.record_health_check(scheme, host, port, false, None)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
|
||||||
|
|
@ -40,9 +40,9 @@ pub async fn proxy_handler(State(state): State<AppState>, request: Request) -> R
|
||||||
.await
|
.await
|
||||||
.map_err(|e| HandlerError::PoolError(e.to_string()))
|
.map_err(|e| HandlerError::PoolError(e.to_string()))
|
||||||
.map(|nodes| {
|
.map(|nodes| {
|
||||||
let pool: Vec<(String, String, i64)> = nodes
|
let pool: Vec<(String, String, u16)> = nodes
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|node| (node.scheme, node.host, node.port as i64))
|
.map(|node| (node.scheme, node.host, node.port))
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
pool
|
pool
|
||||||
|
|
@ -82,17 +82,17 @@ pub async fn proxy_handler(State(state): State<AppState>, request: Request) -> R
|
||||||
async fn proxy_to_multiple_nodes(
|
async fn proxy_to_multiple_nodes(
|
||||||
state: &AppState,
|
state: &AppState,
|
||||||
request: CloneableRequest,
|
request: CloneableRequest,
|
||||||
nodes: Vec<(String, String, i64)>,
|
nodes: Vec<(String, String, u16)>,
|
||||||
) -> Result<Response, HandlerError> {
|
) -> Result<Response, HandlerError> {
|
||||||
if nodes.is_empty() {
|
if nodes.is_empty() {
|
||||||
return Err(HandlerError::NoNodes);
|
return Err(HandlerError::NoNodes);
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut collected_errors: Vec<((String, String, i64), HandlerError)> = Vec::new();
|
let mut collected_errors: Vec<((String, String, u16), HandlerError)> = Vec::new();
|
||||||
|
|
||||||
fn push_error(
|
fn push_error(
|
||||||
errors: &mut Vec<((String, String, i64), HandlerError)>,
|
errors: &mut Vec<((String, String, u16), HandlerError)>,
|
||||||
node: (String, String, i64),
|
node: (String, String, u16),
|
||||||
error: HandlerError,
|
error: HandlerError,
|
||||||
) {
|
) {
|
||||||
tracing::debug!("Proxy request to {} failed: {}", display_node(&node), error);
|
tracing::debug!("Proxy request to {} failed: {}", display_node(&node), error);
|
||||||
|
|
@ -225,7 +225,7 @@ async fn maybe_wrap_with_tls(
|
||||||
async fn proxy_to_single_node(
|
async fn proxy_to_single_node(
|
||||||
state: &crate::AppState,
|
state: &crate::AppState,
|
||||||
request: CloneableRequest,
|
request: CloneableRequest,
|
||||||
node: &(String, String, i64),
|
node: &(String, String, u16),
|
||||||
) -> Result<Response, SingleRequestError> {
|
) -> Result<Response, SingleRequestError> {
|
||||||
use crate::connection_pool::GuardedSender;
|
use crate::connection_pool::GuardedSender;
|
||||||
|
|
||||||
|
|
@ -248,38 +248,33 @@ async fn proxy_to_single_node(
|
||||||
let mut guarded_sender: Option<GuardedSender> = state.connection_pool.try_get(&key).await;
|
let mut guarded_sender: Option<GuardedSender> = state.connection_pool.try_get(&key).await;
|
||||||
|
|
||||||
if guarded_sender.is_none() {
|
if guarded_sender.is_none() {
|
||||||
// Need to build a new TCP/Tor stream.
|
// Build a new connection, and wrap it with TLS if needed.
|
||||||
let no_tls_stream: Box<dyn HyperStream> = if use_tor {
|
let address = (node.1.as_str(), node.2);
|
||||||
let tor_client = state.tor_client.as_ref().ok_or_else(|| {
|
|
||||||
SingleRequestError::ConnectionError("Tor requested but client missing".into())
|
|
||||||
})?;
|
|
||||||
let stream = timeout(
|
|
||||||
TIMEOUT,
|
|
||||||
tor_client.connect(format!("{}:{}", node.1, node.2)),
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.map_err(|e| SingleRequestError::Timeout(e.to_string()))?
|
|
||||||
.map_err(|e| SingleRequestError::ConnectionError(e.to_string()))?;
|
|
||||||
|
|
||||||
Box::new(stream)
|
let maybe_tls_stream = timeout(TIMEOUT, async {
|
||||||
} else {
|
let no_tls_stream: Box<dyn HyperStream> = if use_tor {
|
||||||
let stream = timeout(
|
let tor_client = state.tor_client.as_ref().ok_or_else(|| {
|
||||||
TIMEOUT,
|
SingleRequestError::ConnectionError("Tor requested but client missing".into())
|
||||||
TcpStream::connect(format!("{}:{}", node.1, node.2)),
|
})?;
|
||||||
)
|
|
||||||
.await
|
|
||||||
.map_err(|_| SingleRequestError::Timeout("TCP connection timed out".to_string()))?
|
|
||||||
.map_err(|e| SingleRequestError::ConnectionError(e.to_string()))?;
|
|
||||||
|
|
||||||
Box::new(stream)
|
let stream = tor_client
|
||||||
};
|
.connect(address)
|
||||||
|
.await
|
||||||
|
.map_err(|e| SingleRequestError::ConnectionError(e.to_string()))?;
|
||||||
|
|
||||||
let maybe_tls_stream = timeout(
|
Box::new(stream)
|
||||||
TIMEOUT,
|
} else {
|
||||||
maybe_wrap_with_tls(no_tls_stream, &node.0, &node.1),
|
let stream = TcpStream::connect(address)
|
||||||
)
|
.await
|
||||||
|
.map_err(|e| SingleRequestError::ConnectionError(e.to_string()))?;
|
||||||
|
|
||||||
|
Box::new(stream)
|
||||||
|
};
|
||||||
|
|
||||||
|
maybe_wrap_with_tls(no_tls_stream, &node.0, &node.1).await
|
||||||
|
})
|
||||||
.await
|
.await
|
||||||
.map_err(|_| SingleRequestError::Timeout("TLS handshake timed out".to_string()))??;
|
.map_err(|_| SingleRequestError::Timeout("Connection timed out".to_string()))??;
|
||||||
|
|
||||||
// Build an HTTP/1 connection over the stream.
|
// Build an HTTP/1 connection over the stream.
|
||||||
let (sender, conn) = hyper::client::conn::http1::handshake(TokioIo::new(maybe_tls_stream))
|
let (sender, conn) = hyper::client::conn::http1::handshake(TokioIo::new(maybe_tls_stream))
|
||||||
|
|
@ -639,7 +634,7 @@ enum HandlerError {
|
||||||
PhyiscalError(SingleRequestError),
|
PhyiscalError(SingleRequestError),
|
||||||
HttpError(axum::http::StatusCode),
|
HttpError(axum::http::StatusCode),
|
||||||
JsonRpcError(String),
|
JsonRpcError(String),
|
||||||
AllRequestsFailed(Vec<((String, String, i64), HandlerError)>),
|
AllRequestsFailed(Vec<((String, String, u16), HandlerError)>),
|
||||||
CloneRequestError(String),
|
CloneRequestError(String),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -684,11 +679,11 @@ impl std::fmt::Display for SingleRequestError {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn display_node(node: &(String, String, i64)) -> String {
|
fn display_node(node: &(String, String, u16)) -> String {
|
||||||
format!("{}://{}:{}", node.0, node.1, node.2)
|
format!("{}://{}:{}", node.0, node.1, node.2)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn record_success(state: &AppState, scheme: &str, host: &str, port: i64, latency_ms: f64) {
|
async fn record_success(state: &AppState, scheme: &str, host: &str, port: u16, latency_ms: f64) {
|
||||||
if let Err(e) = state
|
if let Err(e) = state
|
||||||
.node_pool
|
.node_pool
|
||||||
.record_success(scheme, host, port, latency_ms)
|
.record_success(scheme, host, port, latency_ms)
|
||||||
|
|
@ -701,7 +696,7 @@ async fn record_success(state: &AppState, scheme: &str, host: &str, port: i64, l
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn record_failure(state: &AppState, scheme: &str, host: &str, port: i64) {
|
async fn record_failure(state: &AppState, scheme: &str, host: &str, port: u16) {
|
||||||
if let Err(e) = state.node_pool.record_failure(scheme, host, port).await {
|
if let Err(e) = state.node_pool.record_failure(scheme, host, port).await {
|
||||||
error!(
|
error!(
|
||||||
"Failed to record failure for {}://{}:{}: {}",
|
"Failed to record failure for {}://{}:{}: {}",
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue