mirror of
https://github.com/monero-project/monero.git
synced 2024-12-25 03:29:23 -05:00
node_server: add race condition demo
This commit is contained in:
parent
2d3ce2d64a
commit
8922f96e61
@ -35,6 +35,7 @@
|
||||
#include "cryptonote_core/i_core_events.h"
|
||||
#include "cryptonote_protocol/cryptonote_protocol_handler.h"
|
||||
#include "cryptonote_protocol/cryptonote_protocol_handler.inl"
|
||||
#include <condition_variable>
|
||||
|
||||
#define MAKE_IPV4_ADDRESS(a,b,c,d) epee::net_utils::ipv4_network_address{MAKE_IP(a,b,c,d),0}
|
||||
#define MAKE_IPV4_ADDRESS_PORT(a,b,c,d,e) epee::net_utils::ipv4_network_address{MAKE_IP(a,b,c,d),e}
|
||||
@ -906,5 +907,264 @@ TEST(cryptonote_protocol_handler, race_condition)
|
||||
remove_tree(dir);
|
||||
}
|
||||
|
||||
TEST(node_server, race_condition)
|
||||
{
|
||||
struct contexts {
|
||||
using cryptonote = cryptonote::cryptonote_connection_context;
|
||||
using p2p = nodetool::p2p_connection_context_t<cryptonote>;
|
||||
};
|
||||
using context_t = contexts::cryptonote;
|
||||
using options_t = boost::program_options::variables_map;
|
||||
using options_description_t = boost::program_options::options_description;
|
||||
using worker_t = std::thread;
|
||||
struct protocol_t {
|
||||
private:
|
||||
using p2p_endpoint_t = nodetool::i_p2p_endpoint<context_t>;
|
||||
using lock_t = std::mutex;
|
||||
using condition_t = std::condition_variable_any;
|
||||
using unique_lock_t = std::unique_lock<lock_t>;
|
||||
p2p_endpoint_t *p2p_endpoint;
|
||||
lock_t lock;
|
||||
condition_t condition;
|
||||
bool started{};
|
||||
size_t counter{};
|
||||
public:
|
||||
using payload_t = cryptonote::CORE_SYNC_DATA;
|
||||
using blob_t = cryptonote::blobdata;
|
||||
using connection_context = context_t;
|
||||
using payload_type = payload_t;
|
||||
using relay_t = cryptonote::relay_method;
|
||||
using string_t = std::string;
|
||||
using span_t = epee::span<const uint8_t>;
|
||||
using blobs_t = epee::span<const cryptonote::blobdata>;
|
||||
using connections_t = std::list<cryptonote::connection_info>;
|
||||
using block_queue_t = cryptonote::block_queue;
|
||||
using stripes_t = std::pair<uint32_t, uint32_t>;
|
||||
using byte_stream_t = epee::byte_stream;
|
||||
struct core_events_t: cryptonote::i_core_events {
|
||||
uint64_t get_current_blockchain_height() const override { return {}; }
|
||||
bool is_synchronized() const override { return {}; }
|
||||
void on_transactions_relayed(blobs_t blobs, relay_t relay) override {}
|
||||
};
|
||||
int handle_invoke_map(bool is_notify, int command, const span_t in, byte_stream_t &out, context_t &context, bool &handled) {
|
||||
return {};
|
||||
}
|
||||
bool on_idle() {
|
||||
if (not p2p_endpoint)
|
||||
return {};
|
||||
{
|
||||
unique_lock_t guard(lock);
|
||||
if (not started)
|
||||
started = true;
|
||||
else
|
||||
return {};
|
||||
}
|
||||
std::vector<blob_t> txs(128 / 64 * 1024 * 1024, blob_t(1, 'x'));
|
||||
worker_t worker([this]{
|
||||
p2p_endpoint->for_each_connection(
|
||||
[this](context_t &, uint64_t, uint32_t){
|
||||
{
|
||||
unique_lock_t guard(lock);
|
||||
++counter;
|
||||
condition.notify_all();
|
||||
condition.wait(guard, [this]{ return counter >= 3; });
|
||||
}
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(8));
|
||||
return false;
|
||||
}
|
||||
);
|
||||
});
|
||||
{
|
||||
unique_lock_t guard(lock);
|
||||
++counter;
|
||||
condition.notify_all();
|
||||
condition.wait(guard, [this]{ return counter >= 3; });
|
||||
++counter;
|
||||
condition.notify_all();
|
||||
condition.wait(guard, [this]{ return counter >= 5; });
|
||||
}
|
||||
p2p_endpoint->send_txs(
|
||||
std::move(txs),
|
||||
epee::net_utils::zone::public_,
|
||||
{},
|
||||
relay_t::fluff
|
||||
);
|
||||
worker.join();
|
||||
return {};
|
||||
}
|
||||
bool init(const options_t &options) { return {}; }
|
||||
bool deinit() { return {}; }
|
||||
void set_p2p_endpoint(p2p_endpoint_t *p2p_endpoint) {
|
||||
this->p2p_endpoint = p2p_endpoint;
|
||||
}
|
||||
bool process_payload_sync_data(const payload_t &payload, contexts::p2p &context, bool is_inital) {
|
||||
context.m_state = context_t::state_normal;
|
||||
context.m_needed_objects.resize(512 * 1024);
|
||||
{
|
||||
unique_lock_t guard(lock);
|
||||
++counter;
|
||||
condition.notify_all();
|
||||
condition.wait(guard, [this]{ return counter >= 3; });
|
||||
++counter;
|
||||
condition.notify_all();
|
||||
condition.wait(guard, [this]{ return counter >= 5; });
|
||||
}
|
||||
return true;
|
||||
}
|
||||
bool get_payload_sync_data(blob_t &blob) { return {}; }
|
||||
bool get_payload_sync_data(payload_t &payload) { return {}; }
|
||||
bool on_callback(context_t &context) { return {}; }
|
||||
core_events_t &get_core(){ static core_events_t core_events; return core_events;}
|
||||
void log_connections() {}
|
||||
connections_t get_connections() { return {}; }
|
||||
const block_queue_t &get_block_queue() const {
|
||||
static block_queue_t block_queue;
|
||||
return block_queue;
|
||||
}
|
||||
void stop() {}
|
||||
void on_connection_close(context_t &context) {}
|
||||
void set_max_out_peers(unsigned int max) {}
|
||||
bool no_sync() const { return {}; }
|
||||
void set_no_sync(bool value) {}
|
||||
string_t get_peers_overview() const { return {}; }
|
||||
stripes_t get_next_needed_pruning_stripe() const { return {}; }
|
||||
bool needs_new_sync_connections() const { return {}; }
|
||||
bool is_busy_syncing() { return {}; }
|
||||
};
|
||||
using node_server_t = nodetool::node_server<protocol_t>;
|
||||
auto conduct_test = [](protocol_t &protocol){
|
||||
struct messages {
|
||||
struct core {
|
||||
using sync = cryptonote::CORE_SYNC_DATA;
|
||||
};
|
||||
using handshake = nodetool::COMMAND_HANDSHAKE_T<core::sync>;
|
||||
};
|
||||
using handler_t = epee::levin::async_protocol_handler<context_t>;
|
||||
using connection_t = epee::net_utils::connection<handler_t>;
|
||||
using connection_ptr = boost::shared_ptr<connection_t>;
|
||||
using shared_state_t = typename connection_t::shared_state;
|
||||
using shared_state_ptr = std::shared_ptr<shared_state_t>;
|
||||
using io_context_t = boost::asio::io_service;
|
||||
using work_t = boost::asio::io_service::work;
|
||||
using work_ptr = std::shared_ptr<work_t>;
|
||||
using workers_t = std::vector<std::thread>;
|
||||
using endpoint_t = boost::asio::ip::tcp::endpoint;
|
||||
using event_t = epee::simple_event;
|
||||
struct command_handler_t: epee::levin::levin_commands_handler<context_t> {
|
||||
using span_t = epee::span<const uint8_t>;
|
||||
using byte_stream_t = epee::byte_stream;
|
||||
int invoke(int, const span_t, byte_stream_t &, context_t &) override { return {}; }
|
||||
int notify(int, const span_t, context_t &) override { return {}; }
|
||||
void callback(context_t &) override {}
|
||||
void on_connection_new(context_t &) override {}
|
||||
void on_connection_close(context_t &) override {}
|
||||
~command_handler_t() override {}
|
||||
static void destroy(epee::levin::levin_commands_handler<context_t>* ptr) { delete ptr; }
|
||||
};
|
||||
io_context_t io_context;
|
||||
work_ptr work = std::make_shared<work_t>(io_context);
|
||||
workers_t workers;
|
||||
while (workers.size() < 4) {
|
||||
workers.emplace_back([&io_context]{
|
||||
io_context.run();
|
||||
});
|
||||
}
|
||||
io_context.post([&]{
|
||||
protocol.on_idle();
|
||||
});
|
||||
io_context.post([&]{
|
||||
protocol.on_idle();
|
||||
});
|
||||
shared_state_ptr shared_state = std::make_shared<shared_state_t>();
|
||||
shared_state->set_handler(new command_handler_t, &command_handler_t::destroy);
|
||||
connection_ptr conn{new connection_t(io_context, shared_state, {}, {})};
|
||||
endpoint_t endpoint(boost::asio::ip::address::from_string("127.0.0.1"), 48080);
|
||||
conn->socket().connect(endpoint);
|
||||
conn->socket().set_option(boost::asio::ip::tcp::socket::reuse_address(true));
|
||||
conn->start({}, {});
|
||||
context_t context;
|
||||
conn->get_context(context);
|
||||
event_t handshaked;
|
||||
typename messages::handshake::request_t msg{{
|
||||
::config::NETWORK_ID,
|
||||
58080,
|
||||
}};
|
||||
epee::net_utils::async_invoke_remote_command2<typename messages::handshake::response>(
|
||||
context,
|
||||
messages::handshake::ID,
|
||||
msg,
|
||||
*shared_state,
|
||||
[conn, &handshaked](int code, const typename messages::handshake::response &msg, context_t &context){
|
||||
EXPECT_TRUE(code >= 0);
|
||||
handshaked.raise();
|
||||
},
|
||||
P2P_DEFAULT_HANDSHAKE_INVOKE_TIMEOUT
|
||||
);
|
||||
handshaked.wait();
|
||||
conn->strand_.post([conn]{
|
||||
conn->cancel();
|
||||
});
|
||||
conn.reset();
|
||||
work.reset();
|
||||
for (auto& w: workers) {
|
||||
w.join();
|
||||
}
|
||||
};
|
||||
using path_t = boost::filesystem::path;
|
||||
using ec_t = boost::system::error_code;
|
||||
auto create_dir = []{
|
||||
ec_t ec;
|
||||
path_t path = boost::filesystem::temp_directory_path() / boost::filesystem::unique_path("daemon-%%%%%%%%%%%%%%%%", ec);
|
||||
if (ec)
|
||||
return path_t{};
|
||||
auto success = boost::filesystem::create_directory(path, ec);
|
||||
if (not ec && success)
|
||||
return path;
|
||||
return path_t{};
|
||||
};
|
||||
auto remove_tree = [](const path_t &path){
|
||||
ec_t ec;
|
||||
boost::filesystem::remove_all(path, ec);
|
||||
};
|
||||
const auto dir = create_dir();
|
||||
ASSERT_TRUE(not dir.empty());
|
||||
protocol_t protocol{};
|
||||
node_server_t node_server(protocol);
|
||||
protocol.set_p2p_endpoint(&node_server);
|
||||
node_server.init(
|
||||
[&dir]{
|
||||
options_t options;
|
||||
boost::program_options::store(
|
||||
boost::program_options::command_line_parser({
|
||||
"--p2p-bind-ip=127.0.0.1",
|
||||
"--p2p-bind-port=48080",
|
||||
"--out-peers=0",
|
||||
"--data-dir",
|
||||
dir.string(),
|
||||
"--no-igd",
|
||||
"--add-exclusive-node=127.0.0.1:48080",
|
||||
"--check-updates=disabled",
|
||||
"--disable-dns-checkpoints",
|
||||
}).options([]{
|
||||
options_description_t options_description{};
|
||||
cryptonote::core::init_options(options_description);
|
||||
node_server_t::init_options(options_description);
|
||||
return options_description;
|
||||
}()).run(),
|
||||
options
|
||||
);
|
||||
return options;
|
||||
}()
|
||||
);
|
||||
worker_t worker([&]{
|
||||
node_server.run();
|
||||
});
|
||||
conduct_test(protocol);
|
||||
node_server.send_stop_signal();
|
||||
worker.join();
|
||||
node_server.deinit();
|
||||
remove_tree(dir);
|
||||
}
|
||||
|
||||
namespace nodetool { template class node_server<cryptonote::t_cryptonote_protocol_handler<test_core>>; }
|
||||
namespace cryptonote { template class t_cryptonote_protocol_handler<test_core>; }
|
||||
|
Loading…
Reference in New Issue
Block a user