mirror of
https://github.com/iv-org/invidious.git
synced 2025-04-20 23:46:26 -04:00
Rework HTTP::Client logic
Implement force resolve via IO constructor Use `DB::Pool#retry` in connection pool `HTTP::Clients` created through its IO constructor cannot reconnect to closed connections, and instead will result in an error on usage. The `DB::Pool#retry` allows us to cycle until we find a client that that still has an open connection, or to create a new one if necessary. The poisoned clients are then removed once identified by `#retry`'s internal logic The fact that clients with closed connections will now be slowly removed also means that Invidious will no longer use the same pattern of companion connections within the pool; distributing routes more evenly.
This commit is contained in:
parent
ccbbc45361
commit
178628529e
@ -1,3 +1,62 @@
|
||||
module Invidious
|
||||
class IVTCPSocket < TCPSocket
|
||||
def initialize(host : String, port, dns_timeout = nil, connect_timeout = nil, blocking = false, family = Socket::Family::UNSPEC)
|
||||
Addrinfo.tcp(host, port, timeout: dns_timeout, family: family) do |addrinfo|
|
||||
super(addrinfo.family, addrinfo.type, addrinfo.protocol, blocking)
|
||||
connect(addrinfo, timeout: connect_timeout) do |error|
|
||||
close
|
||||
error
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
class HTTPClient < HTTP::Client
|
||||
def initialize(uri : URI, tls : TLSContext = nil, force_resolve : Socket::Family = Socket::Family::UNSPEC)
|
||||
tls = HTTP::Client.tls_flag(uri, tls)
|
||||
|
||||
{% if flag?(:without_openssl) %}
|
||||
if tls
|
||||
raise "HTTP::Client TLS is disabled because `-D without_openssl` was passed at compile time"
|
||||
end
|
||||
@tls = nil
|
||||
{% else %}
|
||||
@tls = case tls
|
||||
when true
|
||||
OpenSSL::SSL::Context::Client.new
|
||||
when OpenSSL::SSL::Context::Client
|
||||
tls
|
||||
when false, nil
|
||||
nil
|
||||
end
|
||||
{% end %}
|
||||
|
||||
@host = HTTP::Client.validate_host(uri)
|
||||
@port = (uri.port || (@tls ? 443 : 80)).to_i
|
||||
|
||||
tcp_socket = IVTCPSocket.new(
|
||||
host: @host,
|
||||
port: @port,
|
||||
family: force_resolve,
|
||||
)
|
||||
|
||||
if tls = @tls
|
||||
begin
|
||||
@io = OpenSSL::SSL::Socket::Client.new(tcp_socket, context: tls, sync_close: true, hostname: @host.rchop('.'))
|
||||
rescue ex
|
||||
# Don't leak the TCP socket when the SSL connection failed
|
||||
tcp_socket.close
|
||||
raise ex
|
||||
end
|
||||
else
|
||||
@io = tcp_socket
|
||||
end
|
||||
|
||||
@reconnect = false
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def add_yt_headers(request)
|
||||
request.headers.delete("User-Agent") if request.headers["User-Agent"] == "Crystal"
|
||||
request.headers["User-Agent"] ||= "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36"
|
||||
@ -13,14 +72,14 @@ def add_yt_headers(request)
|
||||
end
|
||||
end
|
||||
|
||||
def make_client(url : URI, region = nil, force_resolve : Bool = false, force_youtube_headers : Bool = false, use_http_proxy : Bool = true)
|
||||
client = HTTP::Client.new(url)
|
||||
client.proxy = make_configured_http_proxy_client() if CONFIG.http_proxy && use_http_proxy
|
||||
|
||||
# Force the usage of a specific configured IP Family
|
||||
if force_resolve
|
||||
client.family = CONFIG.force_resolve
|
||||
client.family = Socket::Family::INET if client.family == Socket::Family::UNSPEC
|
||||
def make_client(url : URI, region = nil, force_resolve : Bool = false, force_youtube_headers : Bool = true, use_http_proxy : Bool = true)
|
||||
if CONFIG.http_proxy && use_http_proxy
|
||||
client = Invidious::HTTPClient.new(url)
|
||||
client.proxy = make_configured_http_proxy_client() if CONFIG.http_proxy && use_http_proxy
|
||||
elsif force_resolve
|
||||
client = Invidious::HTTPClient.new(url, force_resolve: CONFIG.force_resolve)
|
||||
else
|
||||
client = Invidious::HTTPClient.new(url)
|
||||
end
|
||||
|
||||
client.before_request { |r| add_yt_headers(r) } if url.host.try &.ends_with?("youtube.com") || force_youtube_headers
|
||||
|
@ -25,10 +25,9 @@ module Invidious::ConnectionPool
|
||||
# Streaming API for {{method.id.upcase}} request.
|
||||
# The response will have its body as an `IO` accessed via `HTTP::Client::Response#body_io`.
|
||||
def {{method.id}}(*args, **kwargs, &)
|
||||
self.checkout do | client |
|
||||
self.checkout_with_retry do | client |
|
||||
client.{{method.id}}(*args, **kwargs) do | response |
|
||||
result = yield response
|
||||
return result
|
||||
return yield response
|
||||
ensure
|
||||
response.body_io?.try &.skip_to_end
|
||||
end
|
||||
@ -38,45 +37,82 @@ module Invidious::ConnectionPool
|
||||
# Executes a {{method.id.upcase}} request.
|
||||
# The response will have its body as a `String`, accessed via `HTTP::Client::Response#body`.
|
||||
def {{method.id}}(*args, **kwargs)
|
||||
self.checkout do | client |
|
||||
self.checkout_with_retry do | client |
|
||||
return client.{{method.id}}(*args, **kwargs)
|
||||
end
|
||||
end
|
||||
{% end %}
|
||||
|
||||
# Checks out a client in the pool
|
||||
#
|
||||
# This method will NOT delete a client that has errored from the pool.
|
||||
# Use `#checkout_with_retry` to ensure that the pool does not get poisoned.
|
||||
def checkout(&)
|
||||
# If a client has been deleted from the pool
|
||||
# we won't try to release it
|
||||
client_exists_in_pool = true
|
||||
pool.checkout do |client|
|
||||
# When the HTTP::Client connection is closed, the automatic reconnection
|
||||
# feature will create a new IO to connect to the server with
|
||||
#
|
||||
# This new TCP IO will be a direct connection to the server and will not go
|
||||
# through the proxy. As such we'll need to reinitialize the proxy connection
|
||||
client.proxy = make_configured_http_proxy_client() if @reinitialize_proxy && CONFIG.http_proxy
|
||||
|
||||
http_client = pool.checkout
|
||||
response = yield client
|
||||
|
||||
# When the HTTP::Client connection is closed, the automatic reconnection
|
||||
# feature will create a new IO to connect to the server with
|
||||
#
|
||||
# This new TCP IO will be a direct connection to the server and will not go
|
||||
# through the proxy. As such we'll need to reinitialize the proxy connection
|
||||
|
||||
http_client.proxy = make_configured_http_proxy_client() if @reinitialize_proxy && CONFIG.http_proxy
|
||||
|
||||
response = yield http_client
|
||||
rescue ex : DB::PoolTimeout
|
||||
# Failed to checkout a client
|
||||
raise ConnectionPool::PoolCheckoutError.new(ex.message)
|
||||
rescue ex
|
||||
# An error occurred with the client itself.
|
||||
# Delete the client from the pool and close the connection
|
||||
if http_client
|
||||
client_exists_in_pool = false
|
||||
@pool.delete(http_client)
|
||||
http_client.close
|
||||
return response
|
||||
rescue ex : DB::PoolTimeout
|
||||
# Failed to checkout a client
|
||||
raise ConnectionPool::PoolCheckoutError.new(ex.message)
|
||||
end
|
||||
end
|
||||
|
||||
# Raise exception for outer methods to handle
|
||||
raise ConnectionPool::Error.new(ex.message, cause: ex)
|
||||
ensure
|
||||
pool.release(http_client) if http_client && client_exists_in_pool
|
||||
# Checks out a client from the pool; retries only if a connection is lost or refused
|
||||
#
|
||||
# Will cycle through all of the existing connections at no delay, but any new connections
|
||||
# that is created will be subject to a delay.
|
||||
#
|
||||
# The first attempt to make a new connection will not have the delay, but all subsequent
|
||||
# attempts will.
|
||||
#
|
||||
# To `DB::Pool#retry`:
|
||||
# - `DB::PoolResourceLost` means that the connection has been lost
|
||||
# and should be deleted from the pool.
|
||||
#
|
||||
# - `DB::PoolResourceRefused` means a new connection was intended to be created but failed
|
||||
# but the client can be safely released back into the pool to try again later with
|
||||
#
|
||||
# See the following code of `crystal-db` for more information
|
||||
#
|
||||
# https://github.com/crystal-lang/crystal-db/blob/023dc5de90c11927656fc747601c5f08ea3c906f/src/db/pool.cr#L191
|
||||
# https://github.com/crystal-lang/crystal-db/blob/023dc5de90c11927656fc747601c5f08ea3c906f/src/db/pool_statement.cr#L41
|
||||
# https://github.com/crystal-lang/crystal-db/blob/023dc5de90c11927656fc747601c5f08ea3c906f/src/db/pool_prepared_statement.cr#L13
|
||||
#
|
||||
def checkout_with_retry(&)
|
||||
@pool.retry do
|
||||
self.checkout do |client|
|
||||
begin
|
||||
return yield client
|
||||
rescue ex : IO::TimeoutError
|
||||
LOGGER.trace("Client: #{client} has failed to complete the request. Retrying with a new client")
|
||||
raise DB::PoolResourceRefused.new
|
||||
rescue ex : InfoException
|
||||
raise ex
|
||||
rescue ex : Exception
|
||||
# Any other errors should cause the client to be deleted from the pool
|
||||
|
||||
# This means that the client is closed and needs to be deleted from the pool
|
||||
# due its inability to reconnect
|
||||
if ex.message == "This HTTP::Client cannot be reconnected"
|
||||
LOGGER.trace("Checked out client is closed and cannot be reconnected. Trying the next retry attempt...")
|
||||
else
|
||||
LOGGER.error("Client: #{client} has encountered an error: #{ex} #{ex.message} and will be removed from the pool")
|
||||
end
|
||||
|
||||
raise DB::PoolResourceLost(HTTP::Client).new(client)
|
||||
end
|
||||
end
|
||||
rescue ex : DB::PoolRetryAttemptsExceeded
|
||||
raise PoolRetryAttemptsExceeded.new
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@ -87,6 +123,10 @@ module Invidious::ConnectionPool
|
||||
class PoolCheckoutError < Error
|
||||
end
|
||||
|
||||
# Raised when too many retries
|
||||
class PoolRetryAttemptsExceeded < Error
|
||||
end
|
||||
|
||||
# Mapping of subdomain => Invidious::ConnectionPool::Pool
|
||||
# This is needed as we may need to access arbitrary subdomains of ytimg
|
||||
private YTIMG_POOLS = {} of String => ConnectionPool::Pool
|
||||
|
@ -1,53 +1,3 @@
|
||||
# Override of the TCPSocket and HTTP::Client classes in order to allow an
|
||||
# IP family to be selected for domains that resolve to both IPv4 and
|
||||
# IPv6 addresses.
|
||||
#
|
||||
class TCPSocket
|
||||
def initialize(host, port, dns_timeout = nil, connect_timeout = nil, blocking = false, family = Socket::Family::UNSPEC)
|
||||
Addrinfo.tcp(host, port, timeout: dns_timeout, family: family) do |addrinfo|
|
||||
super(addrinfo.family, addrinfo.type, addrinfo.protocol, blocking)
|
||||
connect(addrinfo, timeout: connect_timeout) do |error|
|
||||
close
|
||||
error
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# :ditto:
|
||||
class HTTP::Client
|
||||
property family : Socket::Family = Socket::Family::UNSPEC
|
||||
|
||||
private def io
|
||||
io = @io
|
||||
return io if io
|
||||
unless @reconnect
|
||||
raise "This HTTP::Client cannot be reconnected"
|
||||
end
|
||||
|
||||
hostname = @host.starts_with?('[') && @host.ends_with?(']') ? @host[1..-2] : @host
|
||||
io = TCPSocket.new hostname, @port, @dns_timeout, @connect_timeout, family: @family
|
||||
io.read_timeout = @read_timeout if @read_timeout
|
||||
io.write_timeout = @write_timeout if @write_timeout
|
||||
io.sync = false
|
||||
|
||||
{% if !flag?(:without_openssl) %}
|
||||
if tls = @tls
|
||||
tcp_socket = io
|
||||
begin
|
||||
io = OpenSSL::SSL::Socket::Client.new(tcp_socket, context: tls, sync_close: true, hostname: @host.rchop('.'))
|
||||
rescue exc
|
||||
# don't leak the TCP socket when the SSL connection failed
|
||||
tcp_socket.close
|
||||
raise exc
|
||||
end
|
||||
end
|
||||
{% end %}
|
||||
|
||||
@io = io
|
||||
end
|
||||
end
|
||||
|
||||
# Mute the ClientError exception raised when a connection is flushed.
|
||||
# This happends when the connection is unexpectedly closed by the client.
|
||||
#
|
||||
|
Loading…
x
Reference in New Issue
Block a user