From 178628529e668be42aa5aa57545932ddbf0c361d Mon Sep 17 00:00:00 2001 From: syeopite Date: Thu, 10 Apr 2025 00:04:36 -0700 Subject: [PATCH 1/2] Rework HTTP::Client logic Implement force resolve via IO constructor Use `DB::Pool#retry` in connection pool `HTTP::Clients` created through its IO constructor cannot reconnect to closed connections, and instead will result in an error on usage. The `DB::Pool#retry` allows us to cycle until we find a client that that still has an open connection, or to create a new one if necessary. The poisoned clients are then removed once identified by `#retry`'s internal logic The fact that clients with closed connections will now be slowly removed also means that Invidious will no longer use the same pattern of companion connections within the pool; distributing routes more evenly. --- src/invidious/connection/client.cr | 75 +++++++++++-- src/invidious/connection/pool.cr | 102 ++++++++++++------ .../helpers/crystal_class_overrides.cr | 50 --------- 3 files changed, 138 insertions(+), 89 deletions(-) diff --git a/src/invidious/connection/client.cr b/src/invidious/connection/client.cr index ab3f8c50..4a0cbade 100644 --- a/src/invidious/connection/client.cr +++ b/src/invidious/connection/client.cr @@ -1,3 +1,62 @@ +module Invidious + class IVTCPSocket < TCPSocket + def initialize(host : String, port, dns_timeout = nil, connect_timeout = nil, blocking = false, family = Socket::Family::UNSPEC) + Addrinfo.tcp(host, port, timeout: dns_timeout, family: family) do |addrinfo| + super(addrinfo.family, addrinfo.type, addrinfo.protocol, blocking) + connect(addrinfo, timeout: connect_timeout) do |error| + close + error + end + end + end + end + + class HTTPClient < HTTP::Client + def initialize(uri : URI, tls : TLSContext = nil, force_resolve : Socket::Family = Socket::Family::UNSPEC) + tls = HTTP::Client.tls_flag(uri, tls) + + {% if flag?(:without_openssl) %} + if tls + raise "HTTP::Client TLS is disabled because `-D without_openssl` was passed at compile time" + end + @tls = nil + {% else %} + @tls = case tls + when true + OpenSSL::SSL::Context::Client.new + when OpenSSL::SSL::Context::Client + tls + when false, nil + nil + end + {% end %} + + @host = HTTP::Client.validate_host(uri) + @port = (uri.port || (@tls ? 443 : 80)).to_i + + tcp_socket = IVTCPSocket.new( + host: @host, + port: @port, + family: force_resolve, + ) + + if tls = @tls + begin + @io = OpenSSL::SSL::Socket::Client.new(tcp_socket, context: tls, sync_close: true, hostname: @host.rchop('.')) + rescue ex + # Don't leak the TCP socket when the SSL connection failed + tcp_socket.close + raise ex + end + else + @io = tcp_socket + end + + @reconnect = false + end + end +end + def add_yt_headers(request) request.headers.delete("User-Agent") if request.headers["User-Agent"] == "Crystal" request.headers["User-Agent"] ||= "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36" @@ -13,14 +72,14 @@ def add_yt_headers(request) end end -def make_client(url : URI, region = nil, force_resolve : Bool = false, force_youtube_headers : Bool = false, use_http_proxy : Bool = true) - client = HTTP::Client.new(url) - client.proxy = make_configured_http_proxy_client() if CONFIG.http_proxy && use_http_proxy - - # Force the usage of a specific configured IP Family - if force_resolve - client.family = CONFIG.force_resolve - client.family = Socket::Family::INET if client.family == Socket::Family::UNSPEC +def make_client(url : URI, region = nil, force_resolve : Bool = false, force_youtube_headers : Bool = true, use_http_proxy : Bool = true) + if CONFIG.http_proxy && use_http_proxy + client = Invidious::HTTPClient.new(url) + client.proxy = make_configured_http_proxy_client() if CONFIG.http_proxy && use_http_proxy + elsif force_resolve + client = Invidious::HTTPClient.new(url, force_resolve: CONFIG.force_resolve) + else + client = Invidious::HTTPClient.new(url) end client.before_request { |r| add_yt_headers(r) } if url.host.try &.ends_with?("youtube.com") || force_youtube_headers diff --git a/src/invidious/connection/pool.cr b/src/invidious/connection/pool.cr index a97b9983..4b94485d 100644 --- a/src/invidious/connection/pool.cr +++ b/src/invidious/connection/pool.cr @@ -25,10 +25,9 @@ module Invidious::ConnectionPool # Streaming API for {{method.id.upcase}} request. # The response will have its body as an `IO` accessed via `HTTP::Client::Response#body_io`. def {{method.id}}(*args, **kwargs, &) - self.checkout do | client | + self.checkout_with_retry do | client | client.{{method.id}}(*args, **kwargs) do | response | - result = yield response - return result + return yield response ensure response.body_io?.try &.skip_to_end end @@ -38,45 +37,82 @@ module Invidious::ConnectionPool # Executes a {{method.id.upcase}} request. # The response will have its body as a `String`, accessed via `HTTP::Client::Response#body`. def {{method.id}}(*args, **kwargs) - self.checkout do | client | + self.checkout_with_retry do | client | return client.{{method.id}}(*args, **kwargs) end end {% end %} # Checks out a client in the pool + # + # This method will NOT delete a client that has errored from the pool. + # Use `#checkout_with_retry` to ensure that the pool does not get poisoned. def checkout(&) - # If a client has been deleted from the pool - # we won't try to release it - client_exists_in_pool = true + pool.checkout do |client| + # When the HTTP::Client connection is closed, the automatic reconnection + # feature will create a new IO to connect to the server with + # + # This new TCP IO will be a direct connection to the server and will not go + # through the proxy. As such we'll need to reinitialize the proxy connection + client.proxy = make_configured_http_proxy_client() if @reinitialize_proxy && CONFIG.http_proxy - http_client = pool.checkout + response = yield client - # When the HTTP::Client connection is closed, the automatic reconnection - # feature will create a new IO to connect to the server with - # - # This new TCP IO will be a direct connection to the server and will not go - # through the proxy. As such we'll need to reinitialize the proxy connection - - http_client.proxy = make_configured_http_proxy_client() if @reinitialize_proxy && CONFIG.http_proxy - - response = yield http_client - rescue ex : DB::PoolTimeout - # Failed to checkout a client - raise ConnectionPool::PoolCheckoutError.new(ex.message) - rescue ex - # An error occurred with the client itself. - # Delete the client from the pool and close the connection - if http_client - client_exists_in_pool = false - @pool.delete(http_client) - http_client.close + return response + rescue ex : DB::PoolTimeout + # Failed to checkout a client + raise ConnectionPool::PoolCheckoutError.new(ex.message) end + end - # Raise exception for outer methods to handle - raise ConnectionPool::Error.new(ex.message, cause: ex) - ensure - pool.release(http_client) if http_client && client_exists_in_pool + # Checks out a client from the pool; retries only if a connection is lost or refused + # + # Will cycle through all of the existing connections at no delay, but any new connections + # that is created will be subject to a delay. + # + # The first attempt to make a new connection will not have the delay, but all subsequent + # attempts will. + # + # To `DB::Pool#retry`: + # - `DB::PoolResourceLost` means that the connection has been lost + # and should be deleted from the pool. + # + # - `DB::PoolResourceRefused` means a new connection was intended to be created but failed + # but the client can be safely released back into the pool to try again later with + # + # See the following code of `crystal-db` for more information + # + # https://github.com/crystal-lang/crystal-db/blob/023dc5de90c11927656fc747601c5f08ea3c906f/src/db/pool.cr#L191 + # https://github.com/crystal-lang/crystal-db/blob/023dc5de90c11927656fc747601c5f08ea3c906f/src/db/pool_statement.cr#L41 + # https://github.com/crystal-lang/crystal-db/blob/023dc5de90c11927656fc747601c5f08ea3c906f/src/db/pool_prepared_statement.cr#L13 + # + def checkout_with_retry(&) + @pool.retry do + self.checkout do |client| + begin + return yield client + rescue ex : IO::TimeoutError + LOGGER.trace("Client: #{client} has failed to complete the request. Retrying with a new client") + raise DB::PoolResourceRefused.new + rescue ex : InfoException + raise ex + rescue ex : Exception + # Any other errors should cause the client to be deleted from the pool + + # This means that the client is closed and needs to be deleted from the pool + # due its inability to reconnect + if ex.message == "This HTTP::Client cannot be reconnected" + LOGGER.trace("Checked out client is closed and cannot be reconnected. Trying the next retry attempt...") + else + LOGGER.error("Client: #{client} has encountered an error: #{ex} #{ex.message} and will be removed from the pool") + end + + raise DB::PoolResourceLost(HTTP::Client).new(client) + end + end + rescue ex : DB::PoolRetryAttemptsExceeded + raise PoolRetryAttemptsExceeded.new + end end end @@ -87,6 +123,10 @@ module Invidious::ConnectionPool class PoolCheckoutError < Error end + # Raised when too many retries + class PoolRetryAttemptsExceeded < Error + end + # Mapping of subdomain => Invidious::ConnectionPool::Pool # This is needed as we may need to access arbitrary subdomains of ytimg private YTIMG_POOLS = {} of String => ConnectionPool::Pool diff --git a/src/invidious/helpers/crystal_class_overrides.cr b/src/invidious/helpers/crystal_class_overrides.cr index fec3f62c..e2a5de6d 100644 --- a/src/invidious/helpers/crystal_class_overrides.cr +++ b/src/invidious/helpers/crystal_class_overrides.cr @@ -1,53 +1,3 @@ -# Override of the TCPSocket and HTTP::Client classes in order to allow an -# IP family to be selected for domains that resolve to both IPv4 and -# IPv6 addresses. -# -class TCPSocket - def initialize(host, port, dns_timeout = nil, connect_timeout = nil, blocking = false, family = Socket::Family::UNSPEC) - Addrinfo.tcp(host, port, timeout: dns_timeout, family: family) do |addrinfo| - super(addrinfo.family, addrinfo.type, addrinfo.protocol, blocking) - connect(addrinfo, timeout: connect_timeout) do |error| - close - error - end - end - end -end - -# :ditto: -class HTTP::Client - property family : Socket::Family = Socket::Family::UNSPEC - - private def io - io = @io - return io if io - unless @reconnect - raise "This HTTP::Client cannot be reconnected" - end - - hostname = @host.starts_with?('[') && @host.ends_with?(']') ? @host[1..-2] : @host - io = TCPSocket.new hostname, @port, @dns_timeout, @connect_timeout, family: @family - io.read_timeout = @read_timeout if @read_timeout - io.write_timeout = @write_timeout if @write_timeout - io.sync = false - - {% if !flag?(:without_openssl) %} - if tls = @tls - tcp_socket = io - begin - io = OpenSSL::SSL::Socket::Client.new(tcp_socket, context: tls, sync_close: true, hostname: @host.rchop('.')) - rescue exc - # don't leak the TCP socket when the SSL connection failed - tcp_socket.close - raise exc - end - end - {% end %} - - @io = io - end -end - # Mute the ClientError exception raised when a connection is flushed. # This happends when the connection is unexpectedly closed by the client. # From 22b76bf20bf48ca6673235f80aed63a51769154c Mon Sep 17 00:00:00 2001 From: syeopite Date: Thu, 10 Apr 2025 01:02:59 -0700 Subject: [PATCH 2/2] Disable auto reconnect for companion pool clients --- src/invidious.cr | 2 +- src/invidious/connection/client.cr | 20 ++++++++++++++++++-- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/src/invidious.cr b/src/invidious.cr index 0ae785df..4c2af431 100644 --- a/src/invidious.cr +++ b/src/invidious.cr @@ -115,7 +115,7 @@ COMPANION_POOL = Invidious::ConnectionPool::Pool.new( reinitialize_proxy: false ) do companion = CONFIG.invidious_companion.sample - next make_client(companion.private_url, use_http_proxy: false) + next make_client(companion.private_url, use_http_proxy: false, allow_auto_reconnect: false) end # CLI diff --git a/src/invidious/connection/client.cr b/src/invidious/connection/client.cr index 4a0cbade..1a49b09b 100644 --- a/src/invidious/connection/client.cr +++ b/src/invidious/connection/client.cr @@ -12,6 +12,15 @@ module Invidious end class HTTPClient < HTTP::Client + def initialize(uri : URI, tls : TLSContext = nil, allow_auto_reconnect : Bool = true) + tls = HTTP::Client.tls_flag(uri, tls) + host = HTTP::Client.validate_host(uri) + + super(host, uri.port, tls) + + @reconnect = allow_auto_reconnect + end + def initialize(uri : URI, tls : TLSContext = nil, force_resolve : Socket::Family = Socket::Family::UNSPEC) tls = HTTP::Client.tls_flag(uri, tls) @@ -72,14 +81,21 @@ def add_yt_headers(request) end end -def make_client(url : URI, region = nil, force_resolve : Bool = false, force_youtube_headers : Bool = true, use_http_proxy : Bool = true) +def make_client( + url : URI, + region = nil, + force_resolve : Bool = false, + force_youtube_headers : Bool = true, + use_http_proxy : Bool = true, + allow_auto_reconnect : Bool = true, +) if CONFIG.http_proxy && use_http_proxy client = Invidious::HTTPClient.new(url) client.proxy = make_configured_http_proxy_client() if CONFIG.http_proxy && use_http_proxy elsif force_resolve client = Invidious::HTTPClient.new(url, force_resolve: CONFIG.force_resolve) else - client = Invidious::HTTPClient.new(url) + client = Invidious::HTTPClient.new(url, allow_auto_reconnect: allow_auto_reconnect) end client.before_request { |r| add_yt_headers(r) } if url.host.try &.ends_with?("youtube.com") || force_youtube_headers