mirror of
https://github.com/iv-org/invidious.git
synced 2025-04-20 23:46:26 -04:00
Merge 22b76bf20bf48ca6673235f80aed63a51769154c into ccbbc453617d841c5020f20071a2ea6ec470979a
This commit is contained in:
commit
aaa78dad1c
@ -115,7 +115,7 @@ COMPANION_POOL = Invidious::ConnectionPool::Pool.new(
|
||||
reinitialize_proxy: false
|
||||
) do
|
||||
companion = CONFIG.invidious_companion.sample
|
||||
next make_client(companion.private_url, use_http_proxy: false)
|
||||
next make_client(companion.private_url, use_http_proxy: false, allow_auto_reconnect: false)
|
||||
end
|
||||
|
||||
# CLI
|
||||
|
@ -1,3 +1,71 @@
|
||||
module Invidious
|
||||
class IVTCPSocket < TCPSocket
|
||||
def initialize(host : String, port, dns_timeout = nil, connect_timeout = nil, blocking = false, family = Socket::Family::UNSPEC)
|
||||
Addrinfo.tcp(host, port, timeout: dns_timeout, family: family) do |addrinfo|
|
||||
super(addrinfo.family, addrinfo.type, addrinfo.protocol, blocking)
|
||||
connect(addrinfo, timeout: connect_timeout) do |error|
|
||||
close
|
||||
error
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
class HTTPClient < HTTP::Client
|
||||
def initialize(uri : URI, tls : TLSContext = nil, allow_auto_reconnect : Bool = true)
|
||||
tls = HTTP::Client.tls_flag(uri, tls)
|
||||
host = HTTP::Client.validate_host(uri)
|
||||
|
||||
super(host, uri.port, tls)
|
||||
|
||||
@reconnect = allow_auto_reconnect
|
||||
end
|
||||
|
||||
def initialize(uri : URI, tls : TLSContext = nil, force_resolve : Socket::Family = Socket::Family::UNSPEC)
|
||||
tls = HTTP::Client.tls_flag(uri, tls)
|
||||
|
||||
{% if flag?(:without_openssl) %}
|
||||
if tls
|
||||
raise "HTTP::Client TLS is disabled because `-D without_openssl` was passed at compile time"
|
||||
end
|
||||
@tls = nil
|
||||
{% else %}
|
||||
@tls = case tls
|
||||
when true
|
||||
OpenSSL::SSL::Context::Client.new
|
||||
when OpenSSL::SSL::Context::Client
|
||||
tls
|
||||
when false, nil
|
||||
nil
|
||||
end
|
||||
{% end %}
|
||||
|
||||
@host = HTTP::Client.validate_host(uri)
|
||||
@port = (uri.port || (@tls ? 443 : 80)).to_i
|
||||
|
||||
tcp_socket = IVTCPSocket.new(
|
||||
host: @host,
|
||||
port: @port,
|
||||
family: force_resolve,
|
||||
)
|
||||
|
||||
if tls = @tls
|
||||
begin
|
||||
@io = OpenSSL::SSL::Socket::Client.new(tcp_socket, context: tls, sync_close: true, hostname: @host.rchop('.'))
|
||||
rescue ex
|
||||
# Don't leak the TCP socket when the SSL connection failed
|
||||
tcp_socket.close
|
||||
raise ex
|
||||
end
|
||||
else
|
||||
@io = tcp_socket
|
||||
end
|
||||
|
||||
@reconnect = false
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def add_yt_headers(request)
|
||||
request.headers.delete("User-Agent") if request.headers["User-Agent"] == "Crystal"
|
||||
request.headers["User-Agent"] ||= "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36"
|
||||
@ -13,14 +81,21 @@ def add_yt_headers(request)
|
||||
end
|
||||
end
|
||||
|
||||
def make_client(url : URI, region = nil, force_resolve : Bool = false, force_youtube_headers : Bool = false, use_http_proxy : Bool = true)
|
||||
client = HTTP::Client.new(url)
|
||||
client.proxy = make_configured_http_proxy_client() if CONFIG.http_proxy && use_http_proxy
|
||||
|
||||
# Force the usage of a specific configured IP Family
|
||||
if force_resolve
|
||||
client.family = CONFIG.force_resolve
|
||||
client.family = Socket::Family::INET if client.family == Socket::Family::UNSPEC
|
||||
def make_client(
|
||||
url : URI,
|
||||
region = nil,
|
||||
force_resolve : Bool = false,
|
||||
force_youtube_headers : Bool = true,
|
||||
use_http_proxy : Bool = true,
|
||||
allow_auto_reconnect : Bool = true,
|
||||
)
|
||||
if CONFIG.http_proxy && use_http_proxy
|
||||
client = Invidious::HTTPClient.new(url)
|
||||
client.proxy = make_configured_http_proxy_client() if CONFIG.http_proxy && use_http_proxy
|
||||
elsif force_resolve
|
||||
client = Invidious::HTTPClient.new(url, force_resolve: CONFIG.force_resolve)
|
||||
else
|
||||
client = Invidious::HTTPClient.new(url, allow_auto_reconnect: allow_auto_reconnect)
|
||||
end
|
||||
|
||||
client.before_request { |r| add_yt_headers(r) } if url.host.try &.ends_with?("youtube.com") || force_youtube_headers
|
||||
|
@ -25,10 +25,9 @@ module Invidious::ConnectionPool
|
||||
# Streaming API for {{method.id.upcase}} request.
|
||||
# The response will have its body as an `IO` accessed via `HTTP::Client::Response#body_io`.
|
||||
def {{method.id}}(*args, **kwargs, &)
|
||||
self.checkout do | client |
|
||||
self.checkout_with_retry do | client |
|
||||
client.{{method.id}}(*args, **kwargs) do | response |
|
||||
result = yield response
|
||||
return result
|
||||
return yield response
|
||||
ensure
|
||||
response.body_io?.try &.skip_to_end
|
||||
end
|
||||
@ -38,45 +37,82 @@ module Invidious::ConnectionPool
|
||||
# Executes a {{method.id.upcase}} request.
|
||||
# The response will have its body as a `String`, accessed via `HTTP::Client::Response#body`.
|
||||
def {{method.id}}(*args, **kwargs)
|
||||
self.checkout do | client |
|
||||
self.checkout_with_retry do | client |
|
||||
return client.{{method.id}}(*args, **kwargs)
|
||||
end
|
||||
end
|
||||
{% end %}
|
||||
|
||||
# Checks out a client in the pool
|
||||
#
|
||||
# This method will NOT delete a client that has errored from the pool.
|
||||
# Use `#checkout_with_retry` to ensure that the pool does not get poisoned.
|
||||
def checkout(&)
|
||||
# If a client has been deleted from the pool
|
||||
# we won't try to release it
|
||||
client_exists_in_pool = true
|
||||
pool.checkout do |client|
|
||||
# When the HTTP::Client connection is closed, the automatic reconnection
|
||||
# feature will create a new IO to connect to the server with
|
||||
#
|
||||
# This new TCP IO will be a direct connection to the server and will not go
|
||||
# through the proxy. As such we'll need to reinitialize the proxy connection
|
||||
client.proxy = make_configured_http_proxy_client() if @reinitialize_proxy && CONFIG.http_proxy
|
||||
|
||||
http_client = pool.checkout
|
||||
response = yield client
|
||||
|
||||
# When the HTTP::Client connection is closed, the automatic reconnection
|
||||
# feature will create a new IO to connect to the server with
|
||||
#
|
||||
# This new TCP IO will be a direct connection to the server and will not go
|
||||
# through the proxy. As such we'll need to reinitialize the proxy connection
|
||||
|
||||
http_client.proxy = make_configured_http_proxy_client() if @reinitialize_proxy && CONFIG.http_proxy
|
||||
|
||||
response = yield http_client
|
||||
rescue ex : DB::PoolTimeout
|
||||
# Failed to checkout a client
|
||||
raise ConnectionPool::PoolCheckoutError.new(ex.message)
|
||||
rescue ex
|
||||
# An error occurred with the client itself.
|
||||
# Delete the client from the pool and close the connection
|
||||
if http_client
|
||||
client_exists_in_pool = false
|
||||
@pool.delete(http_client)
|
||||
http_client.close
|
||||
return response
|
||||
rescue ex : DB::PoolTimeout
|
||||
# Failed to checkout a client
|
||||
raise ConnectionPool::PoolCheckoutError.new(ex.message)
|
||||
end
|
||||
end
|
||||
|
||||
# Raise exception for outer methods to handle
|
||||
raise ConnectionPool::Error.new(ex.message, cause: ex)
|
||||
ensure
|
||||
pool.release(http_client) if http_client && client_exists_in_pool
|
||||
# Checks out a client from the pool; retries only if a connection is lost or refused
|
||||
#
|
||||
# Will cycle through all of the existing connections at no delay, but any new connections
|
||||
# that is created will be subject to a delay.
|
||||
#
|
||||
# The first attempt to make a new connection will not have the delay, but all subsequent
|
||||
# attempts will.
|
||||
#
|
||||
# To `DB::Pool#retry`:
|
||||
# - `DB::PoolResourceLost` means that the connection has been lost
|
||||
# and should be deleted from the pool.
|
||||
#
|
||||
# - `DB::PoolResourceRefused` means a new connection was intended to be created but failed
|
||||
# but the client can be safely released back into the pool to try again later with
|
||||
#
|
||||
# See the following code of `crystal-db` for more information
|
||||
#
|
||||
# https://github.com/crystal-lang/crystal-db/blob/023dc5de90c11927656fc747601c5f08ea3c906f/src/db/pool.cr#L191
|
||||
# https://github.com/crystal-lang/crystal-db/blob/023dc5de90c11927656fc747601c5f08ea3c906f/src/db/pool_statement.cr#L41
|
||||
# https://github.com/crystal-lang/crystal-db/blob/023dc5de90c11927656fc747601c5f08ea3c906f/src/db/pool_prepared_statement.cr#L13
|
||||
#
|
||||
def checkout_with_retry(&)
|
||||
@pool.retry do
|
||||
self.checkout do |client|
|
||||
begin
|
||||
return yield client
|
||||
rescue ex : IO::TimeoutError
|
||||
LOGGER.trace("Client: #{client} has failed to complete the request. Retrying with a new client")
|
||||
raise DB::PoolResourceRefused.new
|
||||
rescue ex : InfoException
|
||||
raise ex
|
||||
rescue ex : Exception
|
||||
# Any other errors should cause the client to be deleted from the pool
|
||||
|
||||
# This means that the client is closed and needs to be deleted from the pool
|
||||
# due its inability to reconnect
|
||||
if ex.message == "This HTTP::Client cannot be reconnected"
|
||||
LOGGER.trace("Checked out client is closed and cannot be reconnected. Trying the next retry attempt...")
|
||||
else
|
||||
LOGGER.error("Client: #{client} has encountered an error: #{ex} #{ex.message} and will be removed from the pool")
|
||||
end
|
||||
|
||||
raise DB::PoolResourceLost(HTTP::Client).new(client)
|
||||
end
|
||||
end
|
||||
rescue ex : DB::PoolRetryAttemptsExceeded
|
||||
raise PoolRetryAttemptsExceeded.new
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@ -87,6 +123,10 @@ module Invidious::ConnectionPool
|
||||
class PoolCheckoutError < Error
|
||||
end
|
||||
|
||||
# Raised when too many retries
|
||||
class PoolRetryAttemptsExceeded < Error
|
||||
end
|
||||
|
||||
# Mapping of subdomain => Invidious::ConnectionPool::Pool
|
||||
# This is needed as we may need to access arbitrary subdomains of ytimg
|
||||
private YTIMG_POOLS = {} of String => ConnectionPool::Pool
|
||||
|
@ -1,53 +1,3 @@
|
||||
# Override of the TCPSocket and HTTP::Client classes in order to allow an
|
||||
# IP family to be selected for domains that resolve to both IPv4 and
|
||||
# IPv6 addresses.
|
||||
#
|
||||
class TCPSocket
|
||||
def initialize(host, port, dns_timeout = nil, connect_timeout = nil, blocking = false, family = Socket::Family::UNSPEC)
|
||||
Addrinfo.tcp(host, port, timeout: dns_timeout, family: family) do |addrinfo|
|
||||
super(addrinfo.family, addrinfo.type, addrinfo.protocol, blocking)
|
||||
connect(addrinfo, timeout: connect_timeout) do |error|
|
||||
close
|
||||
error
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# :ditto:
|
||||
class HTTP::Client
|
||||
property family : Socket::Family = Socket::Family::UNSPEC
|
||||
|
||||
private def io
|
||||
io = @io
|
||||
return io if io
|
||||
unless @reconnect
|
||||
raise "This HTTP::Client cannot be reconnected"
|
||||
end
|
||||
|
||||
hostname = @host.starts_with?('[') && @host.ends_with?(']') ? @host[1..-2] : @host
|
||||
io = TCPSocket.new hostname, @port, @dns_timeout, @connect_timeout, family: @family
|
||||
io.read_timeout = @read_timeout if @read_timeout
|
||||
io.write_timeout = @write_timeout if @write_timeout
|
||||
io.sync = false
|
||||
|
||||
{% if !flag?(:without_openssl) %}
|
||||
if tls = @tls
|
||||
tcp_socket = io
|
||||
begin
|
||||
io = OpenSSL::SSL::Socket::Client.new(tcp_socket, context: tls, sync_close: true, hostname: @host.rchop('.'))
|
||||
rescue exc
|
||||
# don't leak the TCP socket when the SSL connection failed
|
||||
tcp_socket.close
|
||||
raise exc
|
||||
end
|
||||
end
|
||||
{% end %}
|
||||
|
||||
@io = io
|
||||
end
|
||||
end
|
||||
|
||||
# Mute the ClientError exception raised when a connection is flushed.
|
||||
# This happends when the connection is unexpectedly closed by the client.
|
||||
#
|
||||
|
Loading…
x
Reference in New Issue
Block a user