Revert "Merge pull request #7136"

This reverts commit 63c7ca07fb, reversing
changes made to 2218e23e84.
This commit is contained in:
luigi1111 2021-04-16 13:52:44 -05:00
parent d544fd0f52
commit e45619e61e
20 changed files with 215 additions and 319 deletions

View file

@ -31,7 +31,6 @@
#include <cstdint>
#include "byte_stream.h"
#include "net_utils_base.h"
#include "span.h"
@ -84,12 +83,11 @@ namespace levin
#define LEVIN_PROTOCOL_VER_0 0
#define LEVIN_PROTOCOL_VER_1 1
template<class t_connection_context = net_utils::connection_context_base>
struct levin_commands_handler
{
virtual int invoke(int command, const epee::span<const uint8_t> in_buff, byte_stream& buff_out, t_connection_context& context)=0;
virtual int invoke(int command, const epee::span<const uint8_t> in_buff, byte_slice& buff_out, t_connection_context& context)=0;
virtual int notify(int command, const epee::span<const uint8_t> in_buff, t_connection_context& context)=0;
virtual void callback(t_connection_context& context){};
@ -127,41 +125,12 @@ namespace levin
}
}
//! Provides space for levin (p2p) header, so that payload can be sent without copy
class message_writer
{
byte_slice finalize(uint32_t command, uint32_t flags, uint32_t return_code, bool expect_response);
public:
using header = bucket_head2;
explicit message_writer(std::size_t reserve = 8192);
message_writer(const message_writer&) = delete;
message_writer(message_writer&&) = default;
~message_writer() = default;
message_writer& operator=(const message_writer&) = delete;
message_writer& operator=(message_writer&&) = default;
//! \return Size of payload (excludes header size).
std::size_t payload_size() const noexcept
{
return buffer.size() < sizeof(header) ? 0 : buffer.size() - sizeof(header);
}
byte_slice finalize_invoke(uint32_t command) { return finalize(command, LEVIN_PACKET_REQUEST, 0, true); }
byte_slice finalize_notify(uint32_t command) { return finalize(command, LEVIN_PACKET_REQUEST, 0, false); }
byte_slice finalize_response(uint32_t command, uint32_t return_code)
{
return finalize(command, LEVIN_PACKET_RESPONSE, return_code, false);
}
//! Has space for levin header until a finalize method is used
byte_stream buffer;
};
//! \return Intialized levin header.
bucket_head2 make_header(uint32_t command, uint64_t msg_size, uint32_t flags, bool expect_response) noexcept;
//! \return A levin notification message.
byte_slice make_notify(int command, epee::span<const std::uint8_t> payload);
/*! Generate a dummy levin message.
\param noise_bytes Total size of the returned `byte_slice`.
@ -171,11 +140,12 @@ namespace levin
/*! Generate 1+ levin messages that are identical to the noise message size.
\param noise_size Each levin message will be identical to this value.
\param noise Each levin message will be identical to the size of this
message. The bytes from this message will be used for padding.
\return `nullptr` if `noise.size()` is less than the levin header size.
Otherwise, a levin notification message OR 2+ levin fragment messages.
Each message is `noise.size()` in length. */
byte_slice make_fragmented_notify(const std::size_t noise_size, int command, message_writer message);
byte_slice make_fragmented_notify(const byte_slice& noise, int command, epee::span<const std::uint8_t> payload);
}
}

View file

@ -51,21 +51,6 @@
#define MIN_BYTES_WANTED 512
#endif
template<typename context_t>
void on_levin_traffic(const context_t &context, bool initiator, bool sent, bool error, size_t bytes, const char* category)
{
MCINFO("net.p2p.traffic", context << bytes << " bytes " << (sent ? "sent" : "received") << (error ? "/corrupt" : "")
<< " for category " << category << " initiated by " << (initiator ? "us" : "peer"));
}
template<typename context_t>
void on_levin_traffic(const context_t &context, bool initiator, bool sent, bool error, size_t bytes, int command)
{
char buf[32];
snprintf(buf, sizeof(buf), "command-%u", command);
on_levin_traffic(context, initiator, sent, error, bytes, buf);
}
namespace epee
{
namespace levin
@ -103,10 +88,11 @@ public:
uint64_t m_max_packet_size;
uint64_t m_invoke_timeout;
int invoke(int command, message_writer in_msg, std::string& buff_out, boost::uuids::uuid connection_id);
int invoke(int command, const epee::span<const uint8_t> in_buff, std::string& buff_out, boost::uuids::uuid connection_id);
template<class callback_t>
int invoke_async(int command, message_writer in_msg, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED);
int invoke_async(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED);
int notify(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id);
int send(epee::byte_slice message, const boost::uuids::uuid& connection_id);
bool close(boost::uuids::uuid connection_id);
bool update_connection_context(const t_connection_context& contxt);
@ -136,17 +122,12 @@ class async_protocol_handler
{
std::string m_fragment_buffer;
bool send_message(byte_slice message)
bool send_message(uint32_t command, epee::span<const uint8_t> in_buff, uint32_t flags, bool expect_response)
{
if (message.size() < sizeof(message_writer::header))
const bucket_head2 head = make_header(command, in_buff.size(), flags, expect_response);
if(!m_pservice_endpoint->do_send(byte_slice{as_byte_span(head), in_buff}))
return false;
message_writer::header head;
std::memcpy(std::addressof(head), message.data(), sizeof(head));
if(!m_pservice_endpoint->do_send(std::move(message)))
return false;
on_levin_traffic(m_connection_context, true, true, false, head.m_cb, head.m_command);
MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << head.m_cb
<< ", flags" << head.m_flags
<< ", r?=" << head.m_have_to_return_data
@ -542,17 +523,26 @@ public:
{
if(m_current_head.m_have_to_return_data)
{
levin::message_writer return_message{32 * 1024};
byte_slice return_buff;
const uint32_t return_code = m_config.m_pcommands_handler->invoke(
m_current_head.m_command, buff_to_invoke, return_message.buffer, m_connection_context
m_current_head.m_command, buff_to_invoke, return_buff, m_connection_context
);
// peer_id remains unset if dropped
if (m_current_head.m_command == m_connection_context.handshake_command() && m_connection_context.handshake_complete())
m_max_packet_size = m_config.m_max_packet_size;
if(!send_message(return_message.finalize_response(m_current_head.m_command, return_code)))
bucket_head2 head = make_header(m_current_head.m_command, return_buff.size(), LEVIN_PACKET_RESPONSE, false);
head.m_return_code = SWAP32LE(return_code);
if(!m_pservice_endpoint->do_send(byte_slice{{epee::as_byte_span(head), epee::to_span(return_buff)}}))
return false;
MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << head.m_cb
<< ", flags" << head.m_flags
<< ", r?=" << head.m_have_to_return_data
<<", cmd = " << head.m_command
<< ", ver=" << head.m_protocol_version);
}
else
m_config.m_pcommands_handler->notify(m_current_head.m_command, buff_to_invoke, m_connection_context);
@ -629,7 +619,7 @@ public:
}
template<class callback_t>
bool async_invoke(int command, message_writer in_msg, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
bool async_invoke(int command, const epee::span<const uint8_t> in_buff, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED)
{
misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler(
boost::bind(&async_protocol_handler::finish_outer_call, this));
@ -648,7 +638,7 @@ public:
if (command == m_connection_context.handshake_command())
m_max_packet_size = m_config.m_max_packet_size;
if(!send_message(in_msg.finalize_invoke(command)))
if(!send_message(command, in_buff, LEVIN_PACKET_REQUEST, true))
{
LOG_ERROR_CC(m_connection_context, "Failed to do_send");
err_code = LEVIN_ERROR_CONNECTION;
@ -674,7 +664,7 @@ public:
return true;
}
int invoke(int command, message_writer in_msg, std::string& buff_out)
int invoke(int command, const epee::span<const uint8_t> in_buff, std::string& buff_out)
{
misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler(
boost::bind(&async_protocol_handler::finish_outer_call, this));
@ -686,7 +676,7 @@ public:
if (command == m_connection_context.handshake_command())
m_max_packet_size = m_config.m_max_packet_size;
if (!send_message(in_msg.finalize_invoke(command)))
if (!send_message(command, in_buff, LEVIN_PACKET_REQUEST, true))
{
LOG_ERROR_CC(m_connection_context, "Failed to send request");
return LEVIN_ERROR_CONNECTION;
@ -723,9 +713,25 @@ public:
return m_invoke_result_code;
}
/*! Sends `message` without adding a levin header. The message must have been
created with `make_noise_notify`, `make_fragmented_notify`, or
`message_writer::finalize_notify`. See additional instructions for
int notify(int command, const epee::span<const uint8_t> in_buff)
{
misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler(
boost::bind(&async_protocol_handler::finish_outer_call, this));
CRITICAL_REGION_LOCAL(m_call_lock);
if (!send_message(command, in_buff, LEVIN_PACKET_REQUEST, false))
{
LOG_ERROR_CC(m_connection_context, "Failed to send notify message");
return -1;
}
return 1;
}
/*! Sends `message` without adding a levin header. The message must have
been created with `make_notify`, `make_noise_notify` or
`make_fragmented_notify`. See additional instructions for
`make_fragmented_notify`.
\return 1 on success */
@ -735,11 +741,14 @@ public:
boost::bind(&async_protocol_handler::finish_outer_call, this)
);
if (!send_message(std::move(message)))
const std::size_t length = message.size();
if (!m_pservice_endpoint->do_send(std::move(message)))
{
LOG_ERROR_CC(m_connection_context, "Failed to send message, dropping it");
return -1;
}
MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << (length - sizeof(bucket_head2)) << ", r?=0]");
return 1;
}
//------------------------------------------------------------------------------------------
@ -829,19 +838,19 @@ int async_protocol_handler_config<t_connection_context>::find_and_lock_connectio
}
//------------------------------------------------------------------------------------------
template<class t_connection_context>
int async_protocol_handler_config<t_connection_context>::invoke(int command, message_writer in_msg, std::string& buff_out, boost::uuids::uuid connection_id)
int async_protocol_handler_config<t_connection_context>::invoke(int command, const epee::span<const uint8_t> in_buff, std::string& buff_out, boost::uuids::uuid connection_id)
{
async_protocol_handler<t_connection_context>* aph;
int r = find_and_lock_connection(connection_id, aph);
return LEVIN_OK == r ? aph->invoke(command, std::move(in_msg), buff_out) : r;
return LEVIN_OK == r ? aph->invoke(command, in_buff, buff_out) : r;
}
//------------------------------------------------------------------------------------------
template<class t_connection_context> template<class callback_t>
int async_protocol_handler_config<t_connection_context>::invoke_async(int command, message_writer in_msg, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout)
int async_protocol_handler_config<t_connection_context>::invoke_async(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout)
{
async_protocol_handler<t_connection_context>* aph;
int r = find_and_lock_connection(connection_id, aph);
return LEVIN_OK == r ? aph->async_invoke(command, std::move(in_msg), cb, timeout) : r;
return LEVIN_OK == r ? aph->async_invoke(command, in_buff, cb, timeout) : r;
}
//------------------------------------------------------------------------------------------
template<class t_connection_context> template<class callback_t>
@ -920,6 +929,14 @@ void async_protocol_handler_config<t_connection_context>::set_handler(levin_comm
}
//------------------------------------------------------------------------------------------
template<class t_connection_context>
int async_protocol_handler_config<t_connection_context>::notify(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id)
{
async_protocol_handler<t_connection_context>* aph;
int r = find_and_lock_connection(connection_id, aph);
return LEVIN_OK == r ? aph->notify(command, in_buff) : r;
}
//------------------------------------------------------------------------------------------
template<class t_connection_context>
int async_protocol_handler_config<t_connection_context>::send(byte_slice message, const boost::uuids::uuid& connection_id)
{
async_protocol_handler<t_connection_context>* aph;