epee: better network buffer data structure

avoids pointless allocs and memcpy
This commit is contained in:
moneromooo-monero 2018-12-06 18:04:33 +00:00
parent 6bc0c7e685
commit 85665003a7
No known key found for this signature in database
GPG key ID: 686F07454D6CEFC3
22 changed files with 325 additions and 85 deletions

View file

@ -34,6 +34,7 @@
#include <atomic>
#include "levin_base.h"
#include "buffer.h"
#include "misc_language.h"
#include "syncobj.h"
#include "misc_os_dependent.h"
@ -85,11 +86,11 @@ public:
uint64_t m_max_packet_size;
uint64_t m_invoke_timeout;
int invoke(int command, const std::string& in_buff, std::string& buff_out, boost::uuids::uuid connection_id);
int invoke(int command, const epee::span<const uint8_t> in_buff, std::string& buff_out, boost::uuids::uuid connection_id);
template<class callback_t>
int invoke_async(int command, const std::string& in_buff, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED);
int invoke_async(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED);
int notify(int command, const std::string& in_buff, boost::uuids::uuid connection_id);
int notify(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id);
bool close(boost::uuids::uuid connection_id);
bool update_connection_context(const t_connection_context& contxt);
bool request_callback(boost::uuids::uuid connection_id);
@ -143,7 +144,7 @@ public:
config_type& m_config;
t_connection_context& m_connection_context;
std::string m_cache_in_buffer;
net_utils::buffer m_cache_in_buffer;
stream_state m_state;
int32_t m_oponent_protocol_ver;
@ -151,7 +152,7 @@ public:
struct invoke_response_handler_base
{
virtual bool handle(int res, const std::string& buff, connection_context& context)=0;
virtual bool handle(int res, const epee::span<const uint8_t> buff, connection_context& context)=0;
virtual bool is_timer_started() const=0;
virtual void cancel()=0;
virtual bool cancel_timer()=0;
@ -173,7 +174,7 @@ public:
if(ec == boost::asio::error::operation_aborted)
return;
MINFO(con.get_context_ref() << "Timeout on invoke operation happened, command: " << command << " timeout: " << timeout);
std::string fake;
epee::span<const uint8_t> fake;
cb(LEVIN_ERROR_CONNECTION_TIMEDOUT, fake, con.get_context_ref());
con.close();
con.finish_outer_call();
@ -191,7 +192,7 @@ public:
bool m_timer_cancelled;
uint64_t m_timeout;
int m_command;
virtual bool handle(int res, const std::string& buff, typename async_protocol_handler::connection_context& context)
virtual bool handle(int res, const epee::span<const uint8_t> buff, typename async_protocol_handler::connection_context& context)
{
if(!cancel_timer())
return false;
@ -207,7 +208,7 @@ public:
{
if(cancel_timer())
{
std::string fake;
epee::span<const uint8_t> fake;
m_cb(LEVIN_ERROR_CONNECTION_DESTROYED, fake, m_con.get_context_ref());
m_con.finish_outer_call();
}
@ -237,7 +238,7 @@ public:
if(ec == boost::asio::error::operation_aborted)
return;
MINFO(con.get_context_ref() << "Timeout on invoke operation happened, command: " << command << " timeout: " << timeout);
std::string fake;
epee::span<const uint8_t> fake;
cb(LEVIN_ERROR_CONNECTION_TIMEDOUT, fake, con.get_context_ref());
con.close();
con.finish_outer_call();
@ -265,6 +266,7 @@ public:
m_pservice_endpoint(psnd_hndlr),
m_config(config),
m_connection_context(conn_context),
m_cache_in_buffer(256 * 1024),
m_state(stream_state_head)
{
m_close_called = 0;
@ -405,14 +407,7 @@ public:
break;
}
{
std::string buff_to_invoke;
if(m_cache_in_buffer.size() == m_current_head.m_cb)
buff_to_invoke.swap(m_cache_in_buffer);
else
{
buff_to_invoke.assign(m_cache_in_buffer, 0, (std::string::size_type)m_current_head.m_cb);
m_cache_in_buffer.erase(0, (std::string::size_type)m_current_head.m_cb);
}
epee::span<const uint8_t> buff_to_invoke = m_cache_in_buffer.carve((std::string::size_type)m_current_head.m_cb);
bool is_response = (m_oponent_protocol_ver == LEVIN_PROTOCOL_VER_1 && m_current_head.m_flags&LEVIN_PACKET_RESPONSE);
@ -449,8 +444,8 @@ public:
}else
{
CRITICAL_REGION_BEGIN(m_local_inv_buff_lock);
buff_to_invoke.swap(m_local_inv_buff);
buff_to_invoke.clear();
m_local_inv_buff = std::string((const char*)buff_to_invoke.data(), buff_to_invoke.size());
buff_to_invoke = epee::span<const uint8_t>((const uint8_t*)NULL, 0);
m_invoke_result_code = m_current_head.m_return_code;
CRITICAL_REGION_END();
boost::interprocess::ipcdetail::atomic_write32(&m_invoke_buf_ready, 1);
@ -503,7 +498,7 @@ public:
{
if(m_cache_in_buffer.size() < sizeof(bucket_head2))
{
if(m_cache_in_buffer.size() >= sizeof(uint64_t) && *((uint64_t*)m_cache_in_buffer.data()) != SWAP64LE(LEVIN_SIGNATURE))
if(m_cache_in_buffer.size() >= sizeof(uint64_t) && *((uint64_t*)m_cache_in_buffer.span(8).data()) != SWAP64LE(LEVIN_SIGNATURE))
{
MWARNING(m_connection_context << "Signature mismatch, connection will be closed");
return false;
@ -513,9 +508,9 @@ public:
}
#if BYTE_ORDER == LITTLE_ENDIAN
bucket_head2& phead = *(bucket_head2*)m_cache_in_buffer.data();
bucket_head2& phead = *(bucket_head2*)m_cache_in_buffer.span(sizeof(bucket_head2)).data();
#else
bucket_head2 phead = *(bucket_head2*)m_cache_in_buffer.data();
bucket_head2 phead = *(bucket_head2*)m_cache_in_buffer.span(sizeof(bucket_head2)).data();
phead.m_signature = SWAP64LE(phead.m_signature);
phead.m_cb = SWAP64LE(phead.m_cb);
phead.m_command = SWAP32LE(phead.m_command);
@ -530,7 +525,7 @@ public:
}
m_current_head = phead;
m_cache_in_buffer.erase(0, sizeof(bucket_head2));
m_cache_in_buffer.erase(sizeof(bucket_head2));
m_state = stream_state_body;
m_oponent_protocol_ver = m_current_head.m_protocol_version;
if(m_current_head.m_cb > m_config.m_max_packet_size)
@ -562,7 +557,7 @@ public:
}
template<class callback_t>
bool async_invoke(int command, const std::string& in_buff, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
bool async_invoke(int command, const epee::span<const uint8_t> in_buff, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
{
misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler(
boost::bind(&async_protocol_handler::finish_outer_call, this));
@ -606,7 +601,7 @@ public:
break;
}
if(!m_pservice_endpoint->do_send(in_buff.data(), (int)in_buff.size()))
if(!m_pservice_endpoint->do_send(in_buff.data(), in_buff.size()))
{
LOG_ERROR_CC(m_connection_context, "Failed to do_send");
err_code = LEVIN_ERROR_CONNECTION;
@ -623,7 +618,7 @@ public:
if (LEVIN_OK != err_code)
{
std::string stub_buff;
epee::span<const uint8_t> stub_buff{(const uint8_t*)"", 0};
// Never call callback inside critical section, that can cause deadlock
cb(err_code, stub_buff, m_connection_context);
return false;
@ -632,7 +627,7 @@ public:
return true;
}
int invoke(int command, const std::string& in_buff, std::string& buff_out)
int invoke(int command, const epee::span<const uint8_t> in_buff, std::string& buff_out)
{
misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler(
boost::bind(&async_protocol_handler::finish_outer_call, this));
@ -662,7 +657,7 @@ public:
return LEVIN_ERROR_CONNECTION;
}
if(!m_pservice_endpoint->do_send(in_buff.data(), (int)in_buff.size()))
if(!m_pservice_endpoint->do_send(in_buff.data(), in_buff.size()))
{
LOG_ERROR_CC(m_connection_context, "Failed to do_send");
return LEVIN_ERROR_CONNECTION;
@ -706,7 +701,7 @@ public:
return m_invoke_result_code;
}
int notify(int command, const std::string& in_buff)
int notify(int command, const epee::span<const uint8_t> in_buff)
{
misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler(
boost::bind(&async_protocol_handler::finish_outer_call, this));
@ -734,7 +729,7 @@ public:
return -1;
}
if(!m_pservice_endpoint->do_send(in_buff.data(), (int)in_buff.size()))
if(!m_pservice_endpoint->do_send(in_buff.data(), in_buff.size()))
{
LOG_ERROR_CC(m_connection_context, "Failed to do_send()");
return -1;
@ -839,7 +834,7 @@ int async_protocol_handler_config<t_connection_context>::find_and_lock_connectio
}
//------------------------------------------------------------------------------------------
template<class t_connection_context>
int async_protocol_handler_config<t_connection_context>::invoke(int command, const std::string& in_buff, std::string& buff_out, boost::uuids::uuid connection_id)
int async_protocol_handler_config<t_connection_context>::invoke(int command, const epee::span<const uint8_t> in_buff, std::string& buff_out, boost::uuids::uuid connection_id)
{
async_protocol_handler<t_connection_context>* aph;
int r = find_and_lock_connection(connection_id, aph);
@ -847,7 +842,7 @@ int async_protocol_handler_config<t_connection_context>::invoke(int command, con
}
//------------------------------------------------------------------------------------------
template<class t_connection_context> template<class callback_t>
int async_protocol_handler_config<t_connection_context>::invoke_async(int command, const std::string& in_buff, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout)
int async_protocol_handler_config<t_connection_context>::invoke_async(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout)
{
async_protocol_handler<t_connection_context>* aph;
int r = find_and_lock_connection(connection_id, aph);
@ -896,7 +891,7 @@ void async_protocol_handler_config<t_connection_context>::set_handler(levin_comm
}
//------------------------------------------------------------------------------------------
template<class t_connection_context>
int async_protocol_handler_config<t_connection_context>::notify(int command, const std::string& in_buff, boost::uuids::uuid connection_id)
int async_protocol_handler_config<t_connection_context>::notify(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id)
{
async_protocol_handler<t_connection_context>* aph;
int r = find_and_lock_connection(connection_id, aph);