diff --git a/monero-rpc-pool/src/connection_pool.rs b/monero-rpc-pool/src/connection_pool.rs index 0be53d70..8013afdc 100644 --- a/monero-rpc-pool/src/connection_pool.rs +++ b/monero-rpc-pool/src/connection_pool.rs @@ -38,7 +38,7 @@ use axum::body::Body; use tokio::sync::{Mutex, OwnedMutexGuard, RwLock}; /// Key for the map – `(scheme, host, port, via_tor)`. -pub type StreamKey = (String, String, i64, bool); +pub type StreamKey = (String, String, u16, bool); /// Alias for hyper's HTTP/1 sender. pub type HttpSender = hyper::client::conn::http1::SendRequest; diff --git a/monero-rpc-pool/src/database.rs b/monero-rpc-pool/src/database.rs index 0d08260a..8843a966 100644 --- a/monero-rpc-pool/src/database.rs +++ b/monero-rpc-pool/src/database.rs @@ -66,7 +66,7 @@ impl Database { &self, scheme: &str, host: &str, - port: i64, + port: u16, was_successful: bool, latency_ms: Option, ) -> Result<()> { diff --git a/monero-rpc-pool/src/pool.rs b/monero-rpc-pool/src/pool.rs index 67fd25b1..e092b649 100644 --- a/monero-rpc-pool/src/pool.rs +++ b/monero-rpc-pool/src/pool.rs @@ -122,7 +122,7 @@ impl NodePool { &self, scheme: &str, host: &str, - port: i64, + port: u16, latency_ms: f64, ) -> Result<()> { self.db @@ -131,7 +131,7 @@ impl NodePool { Ok(()) } - pub async fn record_failure(&self, scheme: &str, host: &str, port: i64) -> Result<()> { + pub async fn record_failure(&self, scheme: &str, host: &str, port: u16) -> Result<()> { self.db .record_health_check(scheme, host, port, false, None) .await?; diff --git a/monero-rpc-pool/src/proxy.rs b/monero-rpc-pool/src/proxy.rs index f451d830..2c411560 100644 --- a/monero-rpc-pool/src/proxy.rs +++ b/monero-rpc-pool/src/proxy.rs @@ -40,9 +40,9 @@ pub async fn proxy_handler(State(state): State, request: Request) -> R .await .map_err(|e| HandlerError::PoolError(e.to_string())) .map(|nodes| { - let pool: Vec<(String, String, i64)> = nodes + let pool: Vec<(String, String, u16)> = nodes .into_iter() - .map(|node| (node.scheme, node.host, node.port as i64)) + .map(|node| (node.scheme, node.host, node.port)) .collect(); pool @@ -82,17 +82,17 @@ pub async fn proxy_handler(State(state): State, request: Request) -> R async fn proxy_to_multiple_nodes( state: &AppState, request: CloneableRequest, - nodes: Vec<(String, String, i64)>, + nodes: Vec<(String, String, u16)>, ) -> Result { if nodes.is_empty() { return Err(HandlerError::NoNodes); } - let mut collected_errors: Vec<((String, String, i64), HandlerError)> = Vec::new(); + let mut collected_errors: Vec<((String, String, u16), HandlerError)> = Vec::new(); fn push_error( - errors: &mut Vec<((String, String, i64), HandlerError)>, - node: (String, String, i64), + errors: &mut Vec<((String, String, u16), HandlerError)>, + node: (String, String, u16), error: HandlerError, ) { tracing::debug!("Proxy request to {} failed: {}", display_node(&node), error); @@ -225,7 +225,7 @@ async fn maybe_wrap_with_tls( async fn proxy_to_single_node( state: &crate::AppState, request: CloneableRequest, - node: &(String, String, i64), + node: &(String, String, u16), ) -> Result { use crate::connection_pool::GuardedSender; @@ -248,38 +248,33 @@ async fn proxy_to_single_node( let mut guarded_sender: Option = state.connection_pool.try_get(&key).await; if guarded_sender.is_none() { - // Need to build a new TCP/Tor stream. - let no_tls_stream: Box = if use_tor { - let tor_client = state.tor_client.as_ref().ok_or_else(|| { - SingleRequestError::ConnectionError("Tor requested but client missing".into()) - })?; - let stream = timeout( - TIMEOUT, - tor_client.connect(format!("{}:{}", node.1, node.2)), - ) - .await - .map_err(|e| SingleRequestError::Timeout(e.to_string()))? - .map_err(|e| SingleRequestError::ConnectionError(e.to_string()))?; + // Build a new connection, and wrap it with TLS if needed. + let address = (node.1.as_str(), node.2); - Box::new(stream) - } else { - let stream = timeout( - TIMEOUT, - TcpStream::connect(format!("{}:{}", node.1, node.2)), - ) - .await - .map_err(|_| SingleRequestError::Timeout("TCP connection timed out".to_string()))? - .map_err(|e| SingleRequestError::ConnectionError(e.to_string()))?; + let maybe_tls_stream = timeout(TIMEOUT, async { + let no_tls_stream: Box = if use_tor { + let tor_client = state.tor_client.as_ref().ok_or_else(|| { + SingleRequestError::ConnectionError("Tor requested but client missing".into()) + })?; - Box::new(stream) - }; + let stream = tor_client + .connect(address) + .await + .map_err(|e| SingleRequestError::ConnectionError(e.to_string()))?; - let maybe_tls_stream = timeout( - TIMEOUT, - maybe_wrap_with_tls(no_tls_stream, &node.0, &node.1), - ) + Box::new(stream) + } else { + let stream = TcpStream::connect(address) + .await + .map_err(|e| SingleRequestError::ConnectionError(e.to_string()))?; + + Box::new(stream) + }; + + maybe_wrap_with_tls(no_tls_stream, &node.0, &node.1).await + }) .await - .map_err(|_| SingleRequestError::Timeout("TLS handshake timed out".to_string()))??; + .map_err(|_| SingleRequestError::Timeout("Connection timed out".to_string()))??; // Build an HTTP/1 connection over the stream. let (sender, conn) = hyper::client::conn::http1::handshake(TokioIo::new(maybe_tls_stream)) @@ -639,7 +634,7 @@ enum HandlerError { PhyiscalError(SingleRequestError), HttpError(axum::http::StatusCode), JsonRpcError(String), - AllRequestsFailed(Vec<((String, String, i64), HandlerError)>), + AllRequestsFailed(Vec<((String, String, u16), HandlerError)>), CloneRequestError(String), } @@ -684,11 +679,11 @@ impl std::fmt::Display for SingleRequestError { } } -fn display_node(node: &(String, String, i64)) -> String { +fn display_node(node: &(String, String, u16)) -> String { format!("{}://{}:{}", node.0, node.1, node.2) } -async fn record_success(state: &AppState, scheme: &str, host: &str, port: i64, latency_ms: f64) { +async fn record_success(state: &AppState, scheme: &str, host: &str, port: u16, latency_ms: f64) { if let Err(e) = state .node_pool .record_success(scheme, host, port, latency_ms) @@ -701,7 +696,7 @@ async fn record_success(state: &AppState, scheme: &str, host: &str, port: i64, l } } -async fn record_failure(state: &AppState, scheme: &str, host: &str, port: i64) { +async fn record_failure(state: &AppState, scheme: &str, host: &str, port: u16) { if let Err(e) = state.node_pool.record_failure(scheme, host, port).await { error!( "Failed to record failure for {}://{}:{}: {}",