abstract_tcp_server2: timeout on RPC connections

This commit is contained in:
moneromooo-monero 2018-05-26 19:34:13 +01:00
parent ccaa666cf9
commit fd9019b37d
No known key found for this signature in database
GPG Key ID: 686F07454D6CEFC3
2 changed files with 68 additions and 1 deletions

View File

@ -135,6 +135,11 @@ namespace net_utils
/// Handle completion of a write operation. /// Handle completion of a write operation.
void handle_write(const boost::system::error_code& e, size_t cb); void handle_write(const boost::system::error_code& e, size_t cb);
/// reset connection timeout timer and callback
void reset_timer(boost::posix_time::milliseconds ms, bool add);
boost::posix_time::milliseconds get_default_time() const;
boost::posix_time::milliseconds get_timeout_from_bytes_read(size_t bytes) const;
/// Buffer for incoming data. /// Buffer for incoming data.
boost::array<char, 8192> buffer_; boost::array<char, 8192> buffer_;
//boost::array<char, 1024> buffer_; //boost::array<char, 1024> buffer_;
@ -158,6 +163,9 @@ namespace net_utils
boost::mutex m_throttle_speed_in_mutex; boost::mutex m_throttle_speed_in_mutex;
boost::mutex m_throttle_speed_out_mutex; boost::mutex m_throttle_speed_out_mutex;
boost::asio::deadline_timer m_timer;
bool m_local;
public: public:
void setRpcStation(); void setRpcStation();
}; };

View File

@ -44,6 +44,7 @@
#include <boost/thread/thread.hpp> // TODO #include <boost/thread/thread.hpp> // TODO
#include <boost/thread/condition_variable.hpp> // TODO #include <boost/thread/condition_variable.hpp> // TODO
#include "misc_language.h" #include "misc_language.h"
#include "net/local_ip.h"
#include "pragma_comp_defs.h" #include "pragma_comp_defs.h"
#include <sstream> #include <sstream>
@ -55,6 +56,10 @@
#undef MONERO_DEFAULT_LOG_CATEGORY #undef MONERO_DEFAULT_LOG_CATEGORY
#define MONERO_DEFAULT_LOG_CATEGORY "net" #define MONERO_DEFAULT_LOG_CATEGORY "net"
#define DEFAULT_TIMEOUT_MS_LOCAL boost::posix_time::milliseconds(120000) // 2 minutes
#define DEFAULT_TIMEOUT_MS_REMOTE boost::posix_time::milliseconds(10000) // 10 seconds
#define TIMEOUT_EXTRA_MS_PER_BYTE 0.2
PRAGMA_WARNING_PUSH PRAGMA_WARNING_PUSH
namespace epee namespace epee
{ {
@ -79,7 +84,9 @@ PRAGMA_WARNING_DISABLE_VS(4355)
m_pfilter( pfilter ), m_pfilter( pfilter ),
m_connection_type( connection_type ), m_connection_type( connection_type ),
m_throttle_speed_in("speed_in", "throttle_speed_in"), m_throttle_speed_in("speed_in", "throttle_speed_in"),
m_throttle_speed_out("speed_out", "throttle_speed_out") m_throttle_speed_out("speed_out", "throttle_speed_out"),
m_timer(io_service),
m_local(false)
{ {
MDEBUG("test, connection constructor set m_connection_type="<<m_connection_type); MDEBUG("test, connection constructor set m_connection_type="<<m_connection_type);
} }
@ -139,6 +146,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
context = boost::value_initialized<t_connection_context>(); context = boost::value_initialized<t_connection_context>();
const unsigned long ip_{boost::asio::detail::socket_ops::host_to_network_long(remote_ep.address().to_v4().to_ulong())}; const unsigned long ip_{boost::asio::detail::socket_ops::host_to_network_long(remote_ep.address().to_v4().to_ulong())};
m_local = epee::net_utils::is_ip_loopback(ip_);
// create a random uuid // create a random uuid
boost::uuids::uuid random_uuid; boost::uuids::uuid random_uuid;
@ -159,6 +167,8 @@ PRAGMA_WARNING_DISABLE_VS(4355)
m_protocol_handler.after_init_connection(); m_protocol_handler.after_init_connection();
reset_timer(get_default_time(), false);
socket_.async_read_some(boost::asio::buffer(buffer_), socket_.async_read_some(boost::asio::buffer(buffer_),
strand_.wrap( strand_.wrap(
boost::bind(&connection<t_protocol_handler>::handle_read, self, boost::bind(&connection<t_protocol_handler>::handle_read, self,
@ -304,6 +314,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
delay *= 0.5; delay *= 0.5;
if (delay > 0) { if (delay > 0) {
long int ms = (long int)(delay * 100); long int ms = (long int)(delay * 100);
reset_timer(boost::posix_time::milliseconds(ms + 1), true);
boost::this_thread::sleep_for(boost::chrono::milliseconds(ms)); boost::this_thread::sleep_for(boost::chrono::milliseconds(ms));
} }
} while(delay > 0); } while(delay > 0);
@ -329,6 +340,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
shutdown(); shutdown();
}else }else
{ {
reset_timer(get_timeout_from_bytes_read(bytes_transferred), false);
socket_.async_read_some(boost::asio::buffer(buffer_), socket_.async_read_some(boost::asio::buffer(buffer_),
strand_.wrap( strand_.wrap(
boost::bind(&connection<t_protocol_handler>::handle_read, connection<t_protocol_handler>::shared_from_this(), boost::bind(&connection<t_protocol_handler>::handle_read, connection<t_protocol_handler>::shared_from_this(),
@ -539,6 +551,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
do_send_handler_write( ptr , size_now ); // (((H))) do_send_handler_write( ptr , size_now ); // (((H)))
CHECK_AND_ASSERT_MES( size_now == m_send_que.front().size(), false, "Unexpected queue size"); CHECK_AND_ASSERT_MES( size_now == m_send_que.front().size(), false, "Unexpected queue size");
reset_timer(get_default_time(), false);
boost::asio::async_write(socket_, boost::asio::buffer(m_send_que.front().data(), size_now ) , boost::asio::async_write(socket_, boost::asio::buffer(m_send_que.front().data(), size_now ) ,
//strand_.wrap( //strand_.wrap(
boost::bind(&connection<t_protocol_handler>::handle_write, self, _1, _2) boost::bind(&connection<t_protocol_handler>::handle_write, self, _1, _2)
@ -557,9 +570,53 @@ PRAGMA_WARNING_DISABLE_VS(4355)
} // do_send_chunk } // do_send_chunk
//--------------------------------------------------------------------------------- //---------------------------------------------------------------------------------
template<class t_protocol_handler> template<class t_protocol_handler>
boost::posix_time::milliseconds connection<t_protocol_handler>::get_default_time() const
{
if (m_local)
return DEFAULT_TIMEOUT_MS_LOCAL;
else
return DEFAULT_TIMEOUT_MS_REMOTE;
}
//---------------------------------------------------------------------------------
template<class t_protocol_handler>
boost::posix_time::milliseconds connection<t_protocol_handler>::get_timeout_from_bytes_read(size_t bytes) const
{
boost::posix_time::milliseconds ms = (boost::posix_time::milliseconds)(unsigned)(bytes * TIMEOUT_EXTRA_MS_PER_BYTE);
ms += m_timer.expires_from_now();
if (ms > get_default_time())
ms = get_default_time();
return ms;
}
//---------------------------------------------------------------------------------
template<class t_protocol_handler>
void connection<t_protocol_handler>::reset_timer(boost::posix_time::milliseconds ms, bool add)
{
if (m_connection_type != e_connection_type_RPC)
return;
MTRACE("Setting " << ms << " expiry");
auto self = safe_shared_from_this();
if(!self)
{
MERROR("Resetting timer on a dead object");
return;
}
if (add)
ms += m_timer.expires_from_now();
m_timer.expires_from_now(ms);
m_timer.async_wait([=](const boost::system::error_code& ec)
{
if(ec == boost::asio::error::operation_aborted)
return;
MDEBUG("Connection timeout, closing");
self->close();
});
}
//---------------------------------------------------------------------------------
template<class t_protocol_handler>
bool connection<t_protocol_handler>::shutdown() bool connection<t_protocol_handler>::shutdown()
{ {
// Initiate graceful connection closure. // Initiate graceful connection closure.
m_timer.cancel();
boost::system::error_code ignored_ec; boost::system::error_code ignored_ec;
socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignored_ec); socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignored_ec);
m_was_shutdown = true; m_was_shutdown = true;
@ -572,6 +629,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
{ {
TRY_ENTRY(); TRY_ENTRY();
//_info("[sock " << socket_.native_handle() << "] Que Shutdown called."); //_info("[sock " << socket_.native_handle() << "] Que Shutdown called.");
m_timer.cancel();
size_t send_que_size = 0; size_t send_que_size = 0;
CRITICAL_REGION_BEGIN(m_send_que_lock); CRITICAL_REGION_BEGIN(m_send_que_lock);
send_que_size = m_send_que.size(); send_que_size = m_send_que.size();
@ -629,6 +687,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
}else }else
{ {
//have more data to send //have more data to send
reset_timer(get_default_time(), false);
auto size_now = m_send_que.front().size(); auto size_now = m_send_que.front().size();
MDEBUG("handle_write() NOW SENDS: packet="<<size_now<<" B" <<", from queue size="<<m_send_que.size()); MDEBUG("handle_write() NOW SENDS: packet="<<size_now<<" B" <<", from queue size="<<m_send_que.size());
if (speed_limit_is_enabled()) if (speed_limit_is_enabled())