Fix build with boost ASIO 1.87. Support boost 1.66+

This commit is contained in:
Lee *!* Clagett 2024-12-17 16:40:15 -05:00
parent 2e8a128c75
commit 4344f97255
37 changed files with 397 additions and 375 deletions

View file

@ -31,11 +31,12 @@
//
#include <boost/asio/post.hpp>
#include <boost/foreach.hpp>
#include <boost/uuid/random_generator.hpp>
#include <boost/chrono.hpp>
#include <boost/utility/value_init.hpp>
#include <boost/asio/bind_executor.hpp>
#include <boost/asio/deadline_timer.hpp>
#include <boost/date_time/posix_time/posix_time.hpp> // TODO
#include <boost/thread/condition_variable.hpp> // TODO
@ -145,23 +146,19 @@ namespace net_utils
if (m_state.timers.general.wait_expire) {
m_state.timers.general.cancel_expire = true;
m_state.timers.general.reset_expire = true;
ec_t ec;
m_timers.general.expires_from_now(
m_timers.general.expires_after(
std::min(
duration + (add ? m_timers.general.expires_from_now() : duration_t{}),
duration + (add ? (m_timers.general.expiry() - std::chrono::steady_clock::now()) : duration_t{}),
get_default_timeout()
),
ec
)
);
}
else {
ec_t ec;
m_timers.general.expires_from_now(
m_timers.general.expires_after(
std::min(
duration + (add ? m_timers.general.expires_from_now() : duration_t{}),
duration + (add ? (m_timers.general.expiry() - std::chrono::steady_clock::now()) : duration_t{}),
get_default_timeout()
),
ec
)
);
async_wait_timer();
}
@ -202,8 +199,7 @@ namespace net_utils
return;
m_state.timers.general.cancel_expire = true;
m_state.timers.general.reset_expire = false;
ec_t ec;
m_timers.general.cancel(ec);
m_timers.general.cancel();
}
template<typename T>
@ -225,7 +221,8 @@ namespace net_utils
m_state.data.read.buffer.size()
),
boost::asio::transfer_exactly(epee::net_utils::get_ssl_magic_size()),
m_strand.wrap(
boost::asio::bind_executor(
m_strand,
[this, self](const ec_t &ec, size_t bytes_transferred){
std::lock_guard<std::mutex> guard(m_state.lock);
m_state.socket.wait_read = false;
@ -246,7 +243,8 @@ namespace net_utils
) {
m_state.ssl.enabled = false;
m_state.socket.handle_read = true;
connection_basic::strand_.post(
boost::asio::post(
connection_basic::strand_,
[this, self, bytes_transferred]{
bool success = m_handler.handle_recv(
reinterpret_cast<char *>(m_state.data.read.buffer.data()),
@ -304,7 +302,8 @@ namespace net_utils
static_cast<shared_state&>(
connection_basic::get_state()
).ssl_options().configure(connection_basic::socket_, handshake);
m_strand.post(
boost::asio::post(
m_strand,
[this, self, on_handshake]{
connection_basic::socket_.async_handshake(
handshake,
@ -313,7 +312,7 @@ namespace net_utils
m_state.ssl.forced ? 0 :
epee::net_utils::get_ssl_magic_size()
),
m_strand.wrap(on_handshake)
boost::asio::bind_executor(m_strand, on_handshake)
);
}
);
@ -345,8 +344,7 @@ namespace net_utils
};
const auto duration = calc_duration();
if (duration > duration_t{}) {
ec_t ec;
m_timers.throttle.in.expires_from_now(duration, ec);
m_timers.throttle.in.expires_after(duration);
m_state.timers.throttle.in.wait_expire = true;
m_timers.throttle.in.async_wait([this, self](const ec_t &ec){
std::lock_guard<std::mutex> guard(m_state.lock);
@ -401,7 +399,8 @@ namespace net_utils
// writes until the connection terminates without deadlocking waiting
// for handle_recv.
m_state.socket.handle_read = true;
connection_basic::strand_.post(
boost::asio::post(
connection_basic::strand_,
[this, self, bytes_transferred]{
bool success = m_handler.handle_recv(
reinterpret_cast<char *>(m_state.data.read.buffer.data()),
@ -428,17 +427,18 @@ namespace net_utils
m_state.data.read.buffer.data(),
m_state.data.read.buffer.size()
),
m_strand.wrap(on_read)
boost::asio::bind_executor(m_strand, on_read)
);
else
m_strand.post(
boost::asio::post(
m_strand,
[this, self, on_read]{
connection_basic::socket_.async_read_some(
boost::asio::buffer(
m_state.data.read.buffer.data(),
m_state.data.read.buffer.size()
),
m_strand.wrap(on_read)
boost::asio::bind_executor(m_strand, on_read)
);
}
);
@ -473,8 +473,7 @@ namespace net_utils
};
const auto duration = calc_duration();
if (duration > duration_t{}) {
ec_t ec;
m_timers.throttle.out.expires_from_now(duration, ec);
m_timers.throttle.out.expires_after(duration);
m_state.timers.throttle.out.wait_expire = true;
m_timers.throttle.out.async_wait([this, self](const ec_t &ec){
std::lock_guard<std::mutex> guard(m_state.lock);
@ -539,10 +538,11 @@ namespace net_utils
m_state.data.write.queue.back().data(),
m_state.data.write.queue.back().size()
),
m_strand.wrap(on_write)
boost::asio::bind_executor(m_strand, on_write)
);
else
m_strand.post(
boost::asio::post(
m_strand,
[this, self, on_write]{
boost::asio::async_write(
connection_basic::socket_,
@ -550,7 +550,7 @@ namespace net_utils
m_state.data.write.queue.back().data(),
m_state.data.write.queue.back().size()
),
m_strand.wrap(on_write)
boost::asio::bind_executor(m_strand, on_write)
);
}
);
@ -587,10 +587,11 @@ namespace net_utils
terminate();
}
};
m_strand.post(
boost::asio::post(
m_strand,
[this, self, on_shutdown]{
connection_basic::socket_.async_shutdown(
m_strand.wrap(on_shutdown)
boost::asio::bind_executor(m_strand, on_shutdown)
);
}
);
@ -605,15 +606,13 @@ namespace net_utils
wait_socket = m_state.socket.cancel_handshake = true;
if (m_state.timers.throttle.in.wait_expire) {
m_state.timers.throttle.in.cancel_expire = true;
ec_t ec;
m_timers.throttle.in.cancel(ec);
m_timers.throttle.in.cancel();
}
if (m_state.socket.wait_read)
wait_socket = m_state.socket.cancel_read = true;
if (m_state.timers.throttle.out.wait_expire) {
m_state.timers.throttle.out.cancel_expire = true;
ec_t ec;
m_timers.throttle.out.cancel(ec);
m_timers.throttle.out.cancel();
}
if (m_state.socket.wait_write)
wait_socket = m_state.socket.cancel_write = true;
@ -860,7 +859,7 @@ namespace net_utils
ipv4_network_address{
uint32_t{
boost::asio::detail::socket_ops::host_to_network_long(
endpoint.address().to_v4().to_ulong()
endpoint.address().to_v4().to_uint()
)
},
endpoint.port()
@ -938,7 +937,8 @@ namespace net_utils
ssl_support_t ssl_support
):
connection(
std::move(socket_t{io_context}),
io_context,
socket_t{io_context},
std::move(shared_state),
connection_type,
ssl_support
@ -948,15 +948,16 @@ namespace net_utils
template<typename T>
connection<T>::connection(
io_context_t &io_context,
socket_t &&socket,
std::shared_ptr<shared_state> shared_state,
t_connection_type connection_type,
ssl_support_t ssl_support
):
connection_basic(std::move(socket), shared_state, ssl_support),
connection_basic(io_context, std::move(socket), shared_state, ssl_support),
m_handler(this, *shared_state, m_conn_context),
m_connection_type(connection_type),
m_io_context{GET_IO_SERVICE(connection_basic::socket_)},
m_io_context{io_context},
m_strand{m_io_context},
m_timers{m_io_context}
{
@ -1075,7 +1076,7 @@ namespace net_utils
return false;
auto self = connection<T>::shared_from_this();
++m_state.protocol.wait_callback;
connection_basic::strand_.post([this, self]{
boost::asio::post(connection_basic::strand_, [this, self]{
m_handler.handle_qued_callback();
std::lock_guard<std::mutex> guard(m_state.lock);
--m_state.protocol.wait_callback;
@ -1088,7 +1089,7 @@ namespace net_utils
}
template<typename T>
typename connection<T>::io_context_t &connection<T>::get_io_service()
typename connection<T>::io_context_t &connection<T>::get_io_context()
{
return m_io_context;
}
@ -1128,10 +1129,10 @@ namespace net_utils
template<class t_protocol_handler>
boosted_tcp_server<t_protocol_handler>::boosted_tcp_server( t_connection_type connection_type ) :
m_state(std::make_shared<typename connection<t_protocol_handler>::shared_state>()),
m_io_service_local_instance(new worker()),
io_service_(m_io_service_local_instance->io_service),
acceptor_(io_service_),
acceptor_ipv6(io_service_),
m_io_context_local_instance(new worker()),
io_context_(m_io_context_local_instance->io_context),
acceptor_(io_context_),
acceptor_ipv6(io_context_),
default_remote(),
m_stop_signal_sent(false), m_port(0),
m_threads_count(0),
@ -1145,11 +1146,11 @@ namespace net_utils
}
template<class t_protocol_handler>
boosted_tcp_server<t_protocol_handler>::boosted_tcp_server(boost::asio::io_service& extarnal_io_service, t_connection_type connection_type) :
boosted_tcp_server<t_protocol_handler>::boosted_tcp_server(boost::asio::io_context& extarnal_io_context, t_connection_type connection_type) :
m_state(std::make_shared<typename connection<t_protocol_handler>::shared_state>()),
io_service_(extarnal_io_service),
acceptor_(io_service_),
acceptor_ipv6(io_service_),
io_context_(extarnal_io_context),
acceptor_(io_context_),
acceptor_ipv6(io_context_),
default_remote(),
m_stop_signal_sent(false), m_port(0),
m_threads_count(0),
@ -1196,24 +1197,27 @@ namespace net_utils
std::string ipv4_failed = "";
std::string ipv6_failed = "";
boost::asio::ip::tcp::resolver resolver(io_context_);
try
{
boost::asio::ip::tcp::resolver resolver(io_service_);
boost::asio::ip::tcp::resolver::query query(address, boost::lexical_cast<std::string>(port), boost::asio::ip::tcp::resolver::query::canonical_name);
boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query);
acceptor_.open(endpoint.protocol());
const auto results = resolver.resolve(
address, boost::lexical_cast<std::string>(port), boost::asio::ip::tcp::resolver::canonical_name
);
acceptor_.open(results.begin()->endpoint().protocol());
#if !defined(_WIN32)
acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
#endif
acceptor_.bind(endpoint);
acceptor_.bind(*results.begin());
acceptor_.listen();
boost::asio::ip::tcp::endpoint binded_endpoint = acceptor_.local_endpoint();
m_port = binded_endpoint.port();
MDEBUG("start accept (IPv4)");
new_connection_.reset(new connection<t_protocol_handler>(io_service_, m_state, m_connection_type, m_state->ssl_options().support));
new_connection_.reset(new connection<t_protocol_handler>(io_context_, m_state, m_connection_type, m_state->ssl_options().support));
acceptor_.async_accept(new_connection_->socket(),
boost::bind(&boosted_tcp_server<t_protocol_handler>::handle_accept_ipv4, this,
boost::asio::placeholders::error));
boost::bind(&boosted_tcp_server<t_protocol_handler>::handle_accept_ipv4, this,
boost::asio::placeholders::error));
}
catch (const std::exception &e)
{
@ -1234,23 +1238,25 @@ namespace net_utils
try
{
if (port_ipv6 == 0) port_ipv6 = port; // default arg means bind to same port as ipv4
boost::asio::ip::tcp::resolver resolver(io_service_);
boost::asio::ip::tcp::resolver::query query(address_ipv6, boost::lexical_cast<std::string>(port_ipv6), boost::asio::ip::tcp::resolver::query::canonical_name);
boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query);
acceptor_ipv6.open(endpoint.protocol());
const auto results = resolver.resolve(
address_ipv6, boost::lexical_cast<std::string>(port_ipv6), boost::asio::ip::tcp::resolver::canonical_name
);
acceptor_ipv6.open(results.begin()->endpoint().protocol());
#if !defined(_WIN32)
acceptor_ipv6.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
#endif
acceptor_ipv6.set_option(boost::asio::ip::v6_only(true));
acceptor_ipv6.bind(endpoint);
acceptor_ipv6.bind(*results.begin());
acceptor_ipv6.listen();
boost::asio::ip::tcp::endpoint binded_endpoint = acceptor_ipv6.local_endpoint();
m_port_ipv6 = binded_endpoint.port();
MDEBUG("start accept (IPv6)");
new_connection_ipv6.reset(new connection<t_protocol_handler>(io_service_, m_state, m_connection_type, m_state->ssl_options().support));
new_connection_ipv6.reset(new connection<t_protocol_handler>(io_context_, m_state, m_connection_type, m_state->ssl_options().support));
acceptor_ipv6.async_accept(new_connection_ipv6->socket(),
boost::bind(&boosted_tcp_server<t_protocol_handler>::handle_accept_ipv6, this,
boost::asio::placeholders::error));
boost::bind(&boosted_tcp_server<t_protocol_handler>::handle_accept_ipv6, this,
boost::asio::placeholders::error));
}
catch (const std::exception &e)
{
@ -1314,7 +1320,7 @@ namespace net_utils
{
try
{
io_service_.run();
io_context_.run();
return true;
}
catch(const std::exception& ex)
@ -1358,7 +1364,7 @@ namespace net_utils
while(!m_stop_signal_sent)
{
// Create a pool of threads to run all of the io_services.
// Create a pool of threads to run all of the io_contexts.
CRITICAL_REGION_BEGIN(m_threads_lock);
for (std::size_t i = 0; i < threads_count; ++i)
{
@ -1450,7 +1456,7 @@ namespace net_utils
}
connections_.clear();
connections_mutex.unlock();
io_service_.stop();
io_context_.stop();
CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::send_stop_signal()", void());
}
//---------------------------------------------------------------------------------
@ -1497,7 +1503,7 @@ namespace net_utils
(*current_new_connection)->setRpcStation(); // hopefully this is not needed actually
}
connection_ptr conn(std::move((*current_new_connection)));
(*current_new_connection).reset(new connection<t_protocol_handler>(io_service_, m_state, m_connection_type, conn->get_ssl_support()));
(*current_new_connection).reset(new connection<t_protocol_handler>(io_context_, m_state, m_connection_type, conn->get_ssl_support()));
current_acceptor->async_accept((*current_new_connection)->socket(),
boost::bind(accept_function_pointer, this,
boost::asio::placeholders::error));
@ -1532,7 +1538,7 @@ namespace net_utils
assert(m_state != nullptr); // always set in constructor
_erro("Some problems at accept: " << e.message() << ", connections_count = " << m_state->sock_count);
misc_utils::sleep_no_w(100);
(*current_new_connection).reset(new connection<t_protocol_handler>(io_service_, m_state, m_connection_type, (*current_new_connection)->get_ssl_support()));
(*current_new_connection).reset(new connection<t_protocol_handler>(io_context_, m_state, m_connection_type, (*current_new_connection)->get_ssl_support()));
current_acceptor->async_accept((*current_new_connection)->socket(),
boost::bind(accept_function_pointer, this,
boost::asio::placeholders::error));
@ -1541,9 +1547,9 @@ namespace net_utils
template<class t_protocol_handler>
bool boosted_tcp_server<t_protocol_handler>::add_connection(t_connection_context& out, boost::asio::ip::tcp::socket&& sock, network_address real_remote, epee::net_utils::ssl_support_t ssl_support)
{
if(std::addressof(get_io_service()) == std::addressof(GET_IO_SERVICE(sock)))
if(std::addressof(get_io_context()) == std::addressof(sock.get_executor().context()))
{
connection_ptr conn(new connection<t_protocol_handler>(std::move(sock), m_state, m_connection_type, ssl_support));
connection_ptr conn(new connection<t_protocol_handler>(io_context_, std::move(sock), m_state, m_connection_type, ssl_support));
if(conn->start(false, 1 < m_threads_count, std::move(real_remote)))
{
conn->get_context(out);
@ -1553,7 +1559,7 @@ namespace net_utils
}
else
{
MWARNING(out << " was not added, socket/io_service mismatch");
MWARNING(out << " was not added, socket/io_context mismatch");
}
return false;
}
@ -1566,7 +1572,7 @@ namespace net_utils
sock_.open(remote_endpoint.protocol());
if(bind_ip != "0.0.0.0" && bind_ip != "0" && bind_ip != "" )
{
boost::asio::ip::tcp::endpoint local_endpoint(boost::asio::ip::address::from_string(bind_ip.c_str()), 0);
boost::asio::ip::tcp::endpoint local_endpoint(boost::asio::ip::make_address(bind_ip), 0);
boost::system::error_code ec;
sock_.bind(local_endpoint, ec);
if (ec)
@ -1661,7 +1667,7 @@ namespace net_utils
{
TRY_ENTRY();
connection_ptr new_connection_l(new connection<t_protocol_handler>(io_service_, m_state, m_connection_type, ssl_support) );
connection_ptr new_connection_l(new connection<t_protocol_handler>(io_context_, m_state, m_connection_type, ssl_support) );
connections_mutex.lock();
connections_.insert(new_connection_l);
MDEBUG("connections_ size now " << connections_.size());
@ -1671,14 +1677,16 @@ namespace net_utils
bool try_ipv6 = false;
boost::asio::ip::tcp::resolver resolver(io_service_);
boost::asio::ip::tcp::resolver::query query(boost::asio::ip::tcp::v4(), adr, port, boost::asio::ip::tcp::resolver::query::canonical_name);
boost::asio::ip::tcp::resolver resolver(io_context_);
boost::asio::ip::tcp::resolver::results_type results{};
boost::system::error_code resolve_error;
boost::asio::ip::tcp::resolver::iterator iterator;
try
{
//resolving ipv4 address as ipv6 throws, catch here and move on
iterator = resolver.resolve(query, resolve_error);
results = resolver.resolve(
boost::asio::ip::tcp::v4(), adr, port, boost::asio::ip::tcp::resolver::canonical_name, resolve_error
);
}
catch (const boost::system::system_error& e)
{
@ -1696,8 +1704,7 @@ namespace net_utils
std::string bind_ip_to_use;
boost::asio::ip::tcp::resolver::iterator end;
if(iterator == end)
if(results.empty())
{
if (!m_use_ipv6)
{
@ -1717,11 +1724,11 @@ namespace net_utils
if (try_ipv6)
{
boost::asio::ip::tcp::resolver::query query6(boost::asio::ip::tcp::v6(), adr, port, boost::asio::ip::tcp::resolver::query::canonical_name);
results = resolver.resolve(
boost::asio::ip::tcp::v6(), adr, port, boost::asio::ip::tcp::resolver::canonical_name, resolve_error
);
iterator = resolver.resolve(query6, resolve_error);
if(iterator == end)
if(results.empty())
{
_erro("Failed to resolve " << adr);
return false;
@ -1741,6 +1748,8 @@ namespace net_utils
}
const auto iterator = results.begin();
MDEBUG("Trying to connect to " << adr << ":" << port << ", bind_ip = " << bind_ip_to_use);
//boost::asio::ip::tcp::endpoint remote_endpoint(boost::asio::ip::address::from_string(addr.c_str()), port);
@ -1767,7 +1776,6 @@ namespace net_utils
if (r)
{
new_connection_l->get_context(conn_context);
//new_connection_l.reset(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_pfilter));
}
else
{
@ -1786,7 +1794,7 @@ namespace net_utils
bool boosted_tcp_server<t_protocol_handler>::connect_async(const std::string& adr, const std::string& port, uint32_t conn_timeout, const t_callback &cb, const std::string& bind_ip, epee::net_utils::ssl_support_t ssl_support)
{
TRY_ENTRY();
connection_ptr new_connection_l(new connection<t_protocol_handler>(io_service_, m_state, m_connection_type, ssl_support) );
connection_ptr new_connection_l(new connection<t_protocol_handler>(io_context_, m_state, m_connection_type, ssl_support) );
connections_mutex.lock();
connections_.insert(new_connection_l);
MDEBUG("connections_ size now " << connections_.size());
@ -1796,14 +1804,16 @@ namespace net_utils
bool try_ipv6 = false;
boost::asio::ip::tcp::resolver resolver(io_service_);
boost::asio::ip::tcp::resolver::query query(boost::asio::ip::tcp::v4(), adr, port, boost::asio::ip::tcp::resolver::query::canonical_name);
boost::asio::ip::tcp::resolver resolver(io_context_);
boost::asio::ip::tcp::resolver::results_type results{};
boost::system::error_code resolve_error;
boost::asio::ip::tcp::resolver::iterator iterator;
try
{
//resolving ipv4 address as ipv6 throws, catch here and move on
iterator = resolver.resolve(query, resolve_error);
results = resolver.resolve(
boost::asio::ip::tcp::v4(), adr, port, boost::asio::ip::tcp::resolver::canonical_name, resolve_error
);
}
catch (const boost::system::system_error& e)
{
@ -1819,8 +1829,7 @@ namespace net_utils
throw;
}
boost::asio::ip::tcp::resolver::iterator end;
if(iterator == end)
if(results.empty())
{
if (!try_ipv6)
{
@ -1835,24 +1844,23 @@ namespace net_utils
if (try_ipv6)
{
boost::asio::ip::tcp::resolver::query query6(boost::asio::ip::tcp::v6(), adr, port, boost::asio::ip::tcp::resolver::query::canonical_name);
results = resolver.resolve(
boost::asio::ip::tcp::v6(), adr, port, boost::asio::ip::tcp::resolver::canonical_name, resolve_error
);
iterator = resolver.resolve(query6, resolve_error);
if(iterator == end)
if(results.empty())
{
_erro("Failed to resolve " << adr);
return false;
}
}
boost::asio::ip::tcp::endpoint remote_endpoint(*iterator);
boost::asio::ip::tcp::endpoint remote_endpoint(*results.begin());
sock_.open(remote_endpoint.protocol());
if(bind_ip != "0.0.0.0" && bind_ip != "0" && bind_ip != "" )
{
boost::asio::ip::tcp::endpoint local_endpoint(boost::asio::ip::address::from_string(bind_ip.c_str()), 0);
boost::asio::ip::tcp::endpoint local_endpoint(boost::asio::ip::make_address(bind_ip.c_str()), 0);
boost::system::error_code ec;
sock_.bind(local_endpoint, ec);
if (ec)
@ -1864,7 +1872,7 @@ namespace net_utils
}
}
boost::shared_ptr<boost::asio::deadline_timer> sh_deadline(new boost::asio::deadline_timer(io_service_));
boost::shared_ptr<boost::asio::deadline_timer> sh_deadline(new boost::asio::deadline_timer(io_context_));
//start deadline
sh_deadline->expires_from_now(boost::posix_time::milliseconds(conn_timeout));
sh_deadline->async_wait([=](const boost::system::error_code& error)