diff --git a/src/invidious.cr b/src/invidious.cr index 0ae785df..4c2af431 100644 --- a/src/invidious.cr +++ b/src/invidious.cr @@ -115,7 +115,7 @@ COMPANION_POOL = Invidious::ConnectionPool::Pool.new( reinitialize_proxy: false ) do companion = CONFIG.invidious_companion.sample - next make_client(companion.private_url, use_http_proxy: false) + next make_client(companion.private_url, use_http_proxy: false, allow_auto_reconnect: false) end # CLI diff --git a/src/invidious/connection/client.cr b/src/invidious/connection/client.cr index ab3f8c50..1a49b09b 100644 --- a/src/invidious/connection/client.cr +++ b/src/invidious/connection/client.cr @@ -1,3 +1,71 @@ +module Invidious + class IVTCPSocket < TCPSocket + def initialize(host : String, port, dns_timeout = nil, connect_timeout = nil, blocking = false, family = Socket::Family::UNSPEC) + Addrinfo.tcp(host, port, timeout: dns_timeout, family: family) do |addrinfo| + super(addrinfo.family, addrinfo.type, addrinfo.protocol, blocking) + connect(addrinfo, timeout: connect_timeout) do |error| + close + error + end + end + end + end + + class HTTPClient < HTTP::Client + def initialize(uri : URI, tls : TLSContext = nil, allow_auto_reconnect : Bool = true) + tls = HTTP::Client.tls_flag(uri, tls) + host = HTTP::Client.validate_host(uri) + + super(host, uri.port, tls) + + @reconnect = allow_auto_reconnect + end + + def initialize(uri : URI, tls : TLSContext = nil, force_resolve : Socket::Family = Socket::Family::UNSPEC) + tls = HTTP::Client.tls_flag(uri, tls) + + {% if flag?(:without_openssl) %} + if tls + raise "HTTP::Client TLS is disabled because `-D without_openssl` was passed at compile time" + end + @tls = nil + {% else %} + @tls = case tls + when true + OpenSSL::SSL::Context::Client.new + when OpenSSL::SSL::Context::Client + tls + when false, nil + nil + end + {% end %} + + @host = HTTP::Client.validate_host(uri) + @port = (uri.port || (@tls ? 443 : 80)).to_i + + tcp_socket = IVTCPSocket.new( + host: @host, + port: @port, + family: force_resolve, + ) + + if tls = @tls + begin + @io = OpenSSL::SSL::Socket::Client.new(tcp_socket, context: tls, sync_close: true, hostname: @host.rchop('.')) + rescue ex + # Don't leak the TCP socket when the SSL connection failed + tcp_socket.close + raise ex + end + else + @io = tcp_socket + end + + @reconnect = false + end + end +end + def add_yt_headers(request) request.headers.delete("User-Agent") if request.headers["User-Agent"] == "Crystal" request.headers["User-Agent"] ||= "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36" @@ -13,14 +81,21 @@ def add_yt_headers(request) end end -def make_client(url : URI, region = nil, force_resolve : Bool = false, force_youtube_headers : Bool = false, use_http_proxy : Bool = true) - client = HTTP::Client.new(url) - client.proxy = make_configured_http_proxy_client() if CONFIG.http_proxy && use_http_proxy - - # Force the usage of a specific configured IP Family - if force_resolve - client.family = CONFIG.force_resolve - client.family = Socket::Family::INET if client.family == Socket::Family::UNSPEC +def make_client( + url : URI, + region = nil, + force_resolve : Bool = false, + force_youtube_headers : Bool = true, + use_http_proxy : Bool = true, + allow_auto_reconnect : Bool = true, +) + if CONFIG.http_proxy && use_http_proxy + client = Invidious::HTTPClient.new(url) + client.proxy = make_configured_http_proxy_client() if CONFIG.http_proxy && use_http_proxy + elsif force_resolve + client = Invidious::HTTPClient.new(url, force_resolve: CONFIG.force_resolve) + else + client = Invidious::HTTPClient.new(url, allow_auto_reconnect: allow_auto_reconnect) end client.before_request { |r| add_yt_headers(r) } if url.host.try &.ends_with?("youtube.com") || force_youtube_headers diff --git a/src/invidious/connection/pool.cr b/src/invidious/connection/pool.cr index a97b9983..4b94485d 100644 --- a/src/invidious/connection/pool.cr +++ b/src/invidious/connection/pool.cr @@ -25,10 +25,9 @@ module Invidious::ConnectionPool # Streaming API for {{method.id.upcase}} request. # The response will have its body as an `IO` accessed via `HTTP::Client::Response#body_io`. def {{method.id}}(*args, **kwargs, &) - self.checkout do | client | + self.checkout_with_retry do | client | client.{{method.id}}(*args, **kwargs) do | response | - result = yield response - return result + return yield response ensure response.body_io?.try &.skip_to_end end @@ -38,45 +37,82 @@ module Invidious::ConnectionPool # Executes a {{method.id.upcase}} request. # The response will have its body as a `String`, accessed via `HTTP::Client::Response#body`. def {{method.id}}(*args, **kwargs) - self.checkout do | client | + self.checkout_with_retry do | client | return client.{{method.id}}(*args, **kwargs) end end {% end %} # Checks out a client in the pool + # + # This method will NOT delete a client that has errored from the pool. + # Use `#checkout_with_retry` to ensure that the pool does not get poisoned. def checkout(&) - # If a client has been deleted from the pool - # we won't try to release it - client_exists_in_pool = true + pool.checkout do |client| + # When the HTTP::Client connection is closed, the automatic reconnection + # feature will create a new IO to connect to the server with + # + # This new TCP IO will be a direct connection to the server and will not go + # through the proxy. As such we'll need to reinitialize the proxy connection + client.proxy = make_configured_http_proxy_client() if @reinitialize_proxy && CONFIG.http_proxy - http_client = pool.checkout + response = yield client - # When the HTTP::Client connection is closed, the automatic reconnection - # feature will create a new IO to connect to the server with - # - # This new TCP IO will be a direct connection to the server and will not go - # through the proxy. As such we'll need to reinitialize the proxy connection - - http_client.proxy = make_configured_http_proxy_client() if @reinitialize_proxy && CONFIG.http_proxy - - response = yield http_client - rescue ex : DB::PoolTimeout - # Failed to checkout a client - raise ConnectionPool::PoolCheckoutError.new(ex.message) - rescue ex - # An error occurred with the client itself. - # Delete the client from the pool and close the connection - if http_client - client_exists_in_pool = false - @pool.delete(http_client) - http_client.close + return response + rescue ex : DB::PoolTimeout + # Failed to checkout a client + raise ConnectionPool::PoolCheckoutError.new(ex.message) end + end - # Raise exception for outer methods to handle - raise ConnectionPool::Error.new(ex.message, cause: ex) - ensure - pool.release(http_client) if http_client && client_exists_in_pool + # Checks out a client from the pool; retries only if a connection is lost or refused + # + # Will cycle through all of the existing connections at no delay, but any new connections + # that is created will be subject to a delay. + # + # The first attempt to make a new connection will not have the delay, but all subsequent + # attempts will. + # + # To `DB::Pool#retry`: + # - `DB::PoolResourceLost` means that the connection has been lost + # and should be deleted from the pool. + # + # - `DB::PoolResourceRefused` means a new connection was intended to be created but failed + # but the client can be safely released back into the pool to try again later with + # + # See the following code of `crystal-db` for more information + # + # https://github.com/crystal-lang/crystal-db/blob/023dc5de90c11927656fc747601c5f08ea3c906f/src/db/pool.cr#L191 + # https://github.com/crystal-lang/crystal-db/blob/023dc5de90c11927656fc747601c5f08ea3c906f/src/db/pool_statement.cr#L41 + # https://github.com/crystal-lang/crystal-db/blob/023dc5de90c11927656fc747601c5f08ea3c906f/src/db/pool_prepared_statement.cr#L13 + # + def checkout_with_retry(&) + @pool.retry do + self.checkout do |client| + begin + return yield client + rescue ex : IO::TimeoutError + LOGGER.trace("Client: #{client} has failed to complete the request. Retrying with a new client") + raise DB::PoolResourceRefused.new + rescue ex : InfoException + raise ex + rescue ex : Exception + # Any other errors should cause the client to be deleted from the pool + + # This means that the client is closed and needs to be deleted from the pool + # due its inability to reconnect + if ex.message == "This HTTP::Client cannot be reconnected" + LOGGER.trace("Checked out client is closed and cannot be reconnected. Trying the next retry attempt...") + else + LOGGER.error("Client: #{client} has encountered an error: #{ex} #{ex.message} and will be removed from the pool") + end + + raise DB::PoolResourceLost(HTTP::Client).new(client) + end + end + rescue ex : DB::PoolRetryAttemptsExceeded + raise PoolRetryAttemptsExceeded.new + end end end @@ -87,6 +123,10 @@ module Invidious::ConnectionPool class PoolCheckoutError < Error end + # Raised when too many retries + class PoolRetryAttemptsExceeded < Error + end + # Mapping of subdomain => Invidious::ConnectionPool::Pool # This is needed as we may need to access arbitrary subdomains of ytimg private YTIMG_POOLS = {} of String => ConnectionPool::Pool diff --git a/src/invidious/helpers/crystal_class_overrides.cr b/src/invidious/helpers/crystal_class_overrides.cr index fec3f62c..e2a5de6d 100644 --- a/src/invidious/helpers/crystal_class_overrides.cr +++ b/src/invidious/helpers/crystal_class_overrides.cr @@ -1,53 +1,3 @@ -# Override of the TCPSocket and HTTP::Client classes in order to allow an -# IP family to be selected for domains that resolve to both IPv4 and -# IPv6 addresses. -# -class TCPSocket - def initialize(host, port, dns_timeout = nil, connect_timeout = nil, blocking = false, family = Socket::Family::UNSPEC) - Addrinfo.tcp(host, port, timeout: dns_timeout, family: family) do |addrinfo| - super(addrinfo.family, addrinfo.type, addrinfo.protocol, blocking) - connect(addrinfo, timeout: connect_timeout) do |error| - close - error - end - end - end -end - -# :ditto: -class HTTP::Client - property family : Socket::Family = Socket::Family::UNSPEC - - private def io - io = @io - return io if io - unless @reconnect - raise "This HTTP::Client cannot be reconnected" - end - - hostname = @host.starts_with?('[') && @host.ends_with?(']') ? @host[1..-2] : @host - io = TCPSocket.new hostname, @port, @dns_timeout, @connect_timeout, family: @family - io.read_timeout = @read_timeout if @read_timeout - io.write_timeout = @write_timeout if @write_timeout - io.sync = false - - {% if !flag?(:without_openssl) %} - if tls = @tls - tcp_socket = io - begin - io = OpenSSL::SSL::Socket::Client.new(tcp_socket, context: tls, sync_close: true, hostname: @host.rchop('.')) - rescue exc - # don't leak the TCP socket when the SSL connection failed - tcp_socket.close - raise exc - end - end - {% end %} - - @io = io - end -end - # Mute the ClientError exception raised when a connection is flushed. # This happends when the connection is unexpectedly closed by the client. #