diff --git a/src/cryptonote_protocol/levin_notify.cpp b/src/cryptonote_protocol/levin_notify.cpp index ab4eeeb82..8b36ca58c 100644 --- a/src/cryptonote_protocol/levin_notify.cpp +++ b/src/cryptonote_protocol/levin_notify.cpp @@ -298,6 +298,12 @@ namespace levin boost::asio::steady_timer next_epoch; boost::asio::steady_timer flush_txs; boost::asio::io_service::strand strand; + struct context_t { + std::vector fluff_txs; + std::chrono::steady_clock::time_point flush_time; + bool m_is_income; + }; + boost::unordered_map contexts; net::dandelionpp::connection_map map;//!< Tracks outgoing uuid's for noise channels or Dandelion++ stems std::deque channels; //!< Never touch after init; only update elements on `noise_channel.strand` std::atomic connection_count; //!< Only update in strand, can be read at any time @@ -374,14 +380,16 @@ namespace levin const auto now = std::chrono::steady_clock::now(); auto next_flush = std::chrono::steady_clock::time_point::max(); std::vector, boost::uuids::uuid>> connections{}; - zone_->p2p->foreach_connection([timer_error, now, &next_flush, &connections] (detail::p2p_context& context) + for (auto &e: zone_->contexts) { + auto &id = e.first; + auto &context = e.second; if (!context.fluff_txs.empty()) { if (context.flush_time <= now || timer_error) // flush on canceled timer { context.flush_time = std::chrono::steady_clock::time_point::max(); - connections.emplace_back(std::move(context.fluff_txs), context.m_connection_id); + connections.emplace_back(std::move(context.fluff_txs), id); context.fluff_txs.clear(); } else // not flushing yet @@ -389,8 +397,7 @@ namespace levin } else // nothing to flush context.flush_time = std::chrono::steady_clock::time_point::max(); - return true; - }); + } /* Always send with `fluff` flag, even over i2p/tor. The hidden service will disable the forwarding delay and immediately fluff. The i2p/tor @@ -438,22 +445,21 @@ namespace levin MDEBUG("Queueing " << txs.size() << " transaction(s) for Dandelion++ fluffing"); - - zone->p2p->foreach_connection([txs, now, &zone, &source, &in_duration, &out_duration, &next_flush] (detail::p2p_context& context) + for (auto &e: zone->contexts) { + auto &id = e.first; + auto &context = e.second; // When i2p/tor, only fluff to outbound connections - if (context.handshake_complete() && source != context.m_connection_id && (zone->nzone == epee::net_utils::zone::public_ || !context.m_is_income)) + if (source != id && (zone->nzone == epee::net_utils::zone::public_ || !context.m_is_income)) { if (context.fluff_txs.empty()) context.flush_time = now + (context.m_is_income ? in_duration() : out_duration()); next_flush = std::min(next_flush, context.flush_time); context.fluff_txs.reserve(context.fluff_txs.size() + txs.size()); - for (const blobdata& tx : txs) - context.fluff_txs.push_back(tx); // must copy instead of move (multiple conns) + context.fluff_txs.insert(context.fluff_txs.end(), txs.begin(), txs.end()); } - return true; - }); + } if (next_flush == std::chrono::steady_clock::time_point::max()) MWARNING("Unable to send transaction(s), no available connections"); @@ -764,6 +770,32 @@ namespace levin ); } + void notify::on_handshake_complete(const boost::uuids::uuid &id, bool is_income) + { + if (!zone_) + return; + + auto& zone = zone_; + zone_->strand.dispatch([zone, id, is_income]{ + zone->contexts[id] = { + .fluff_txs = {}, + .flush_time = std::chrono::steady_clock::time_point::max(), + .m_is_income = is_income, + }; + }); + } + + void notify::on_connection_close(const boost::uuids::uuid &id) + { + if (!zone_) + return; + + auto& zone = zone_; + zone_->strand.dispatch([zone, id]{ + zone->contexts.erase(id); + }); + } + void notify::run_epoch() { if (!zone_) diff --git a/src/cryptonote_protocol/levin_notify.h b/src/cryptonote_protocol/levin_notify.h index abbf9d461..12704746a 100644 --- a/src/cryptonote_protocol/levin_notify.h +++ b/src/cryptonote_protocol/levin_notify.h @@ -101,6 +101,9 @@ namespace levin //! Probe for new outbound connection - skips if not needed. void new_out_connection(); + void on_handshake_complete(const boost::uuids::uuid &id, bool is_income); + void on_connection_close(const boost::uuids::uuid &id); + //! Run the logic for the next epoch immediately. Only use in testing. void run_epoch(); diff --git a/src/p2p/net_node.h b/src/p2p/net_node.h index 59a6e5091..3660d2edb 100644 --- a/src/p2p/net_node.h +++ b/src/p2p/net_node.h @@ -111,15 +111,11 @@ namespace nodetool struct p2p_connection_context_t: base_type //t_payload_net_handler::connection_context //public net_utils::connection_context_base { p2p_connection_context_t() - : fluff_txs(), - flush_time(std::chrono::steady_clock::time_point::max()), - peer_id(0), + : peer_id(0), support_flags(0), m_in_timedsync(false) {} - std::vector fluff_txs; - std::chrono::steady_clock::time_point flush_time; peerid_type peer_id; uint32_t support_flags; bool m_in_timedsync; diff --git a/src/p2p/net_node.inl b/src/p2p/net_node.inl index b8cf2d124..438b8ca11 100644 --- a/src/p2p/net_node.inl +++ b/src/p2p/net_node.inl @@ -1432,6 +1432,7 @@ namespace nodetool ape.first_seen = first_seen_stamp ? first_seen_stamp : time(nullptr); zone.m_peerlist.append_with_peer_anchor(ape); + zone.m_notifier.on_handshake_complete(con->m_connection_id, con->m_is_income); zone.m_notifier.new_out_connection(); LOG_DEBUG_CC(*con, "CONNECTION HANDSHAKED OK."); @@ -2559,6 +2560,8 @@ namespace nodetool return 1; } + zone.m_notifier.on_handshake_complete(context.m_connection_id, context.m_is_income); + if(has_too_many_connections(context.m_remote_address)) { LOG_PRINT_CCONTEXT_L1("CONNECTION FROM " << context.m_remote_address.host_str() << " REFUSED, too many connections from the same address"); @@ -2683,6 +2686,9 @@ namespace nodetool zone.m_peerlist.remove_from_peer_anchor(na); } + if (!zone.m_net_server.is_stop_signal_sent()) { + zone.m_notifier.on_connection_close(context.m_connection_id); + } m_payload_handler.on_connection_close(context); MINFO("["<< epee::net_utils::print_connection_context(context) << "] CLOSE CONNECTION"); diff --git a/tests/unit_tests/levin.cpp b/tests/unit_tests/levin.cpp index 4d8c5225e..3e7a32557 100644 --- a/tests/unit_tests/levin.cpp +++ b/tests/unit_tests/levin.cpp @@ -265,11 +265,17 @@ namespace virtual void callback(cryptonote::levin::detail::p2p_context& context) override final {} - virtual void on_connection_new(cryptonote::levin::detail::p2p_context&) override final - {} + virtual void on_connection_new(cryptonote::levin::detail::p2p_context& context) override final + { + if (notifier) + notifier->on_handshake_complete(context.m_connection_id, context.m_is_income); + } - virtual void on_connection_close(cryptonote::levin::detail::p2p_context&) override final - {} + virtual void on_connection_close(cryptonote::levin::detail::p2p_context& context) override final + { + if (notifier) + notifier->on_connection_close(context.m_connection_id); + } public: test_receiver() @@ -306,6 +312,8 @@ namespace { return get_raw_message(notified_); } + + std::shared_ptr notifier{}; }; class levin_notify : public ::testing::Test @@ -343,13 +351,16 @@ namespace EXPECT_EQ(connection_ids_.size(), connections_->get_connections_count()); } - cryptonote::levin::notify make_notifier(const std::size_t noise_size, bool is_public, bool pad_txs) + std::shared_ptr make_notifier(const std::size_t noise_size, bool is_public, bool pad_txs) { epee::byte_slice noise = nullptr; if (noise_size) noise = epee::levin::make_noise_notify(noise_size); epee::net_utils::zone zone = is_public ? epee::net_utils::zone::public_ : epee::net_utils::zone::i2p; - return cryptonote::levin::notify{io_service_, connections_, std::move(noise), zone, pad_txs, events_}; + receiver_.notifier.reset( + new cryptonote::levin::notify{io_service_, connections_, std::move(noise), zone, pad_txs, events_} + ); + return receiver_.notifier; } boost::uuids::random_generator random_generator_; @@ -517,7 +528,8 @@ TEST_F(levin_notify, defaulted) TEST_F(levin_notify, fluff_without_padding) { - cryptonote::levin::notify notifier = make_notifier(0, true, false); + std::shared_ptr notifier_ptr = make_notifier(0, true, false); + auto ¬ifier = *notifier_ptr; for (unsigned count = 0; count < 10; ++count) add_connection(count % 2 == 0); @@ -563,7 +575,8 @@ TEST_F(levin_notify, fluff_without_padding) TEST_F(levin_notify, stem_without_padding) { - cryptonote::levin::notify notifier = make_notifier(0, true, false); + std::shared_ptr notifier_ptr = make_notifier(0, true, false); + auto ¬ifier = *notifier_ptr; for (unsigned count = 0; count < 10; ++count) add_connection(count % 2 == 0); @@ -635,7 +648,8 @@ TEST_F(levin_notify, stem_without_padding) TEST_F(levin_notify, stem_no_outs_without_padding) { - cryptonote::levin::notify notifier = make_notifier(0, true, false); + std::shared_ptr notifier_ptr = make_notifier(0, true, false); + auto ¬ifier = *notifier_ptr; for (unsigned count = 0; count < 10; ++count) add_connection(true); @@ -690,7 +704,8 @@ TEST_F(levin_notify, stem_no_outs_without_padding) TEST_F(levin_notify, local_without_padding) { - cryptonote::levin::notify notifier = make_notifier(0, true, false); + std::shared_ptr notifier_ptr = make_notifier(0, true, false); + auto ¬ifier = *notifier_ptr; for (unsigned count = 0; count < 10; ++count) add_connection(count % 2 == 0); @@ -762,7 +777,8 @@ TEST_F(levin_notify, local_without_padding) TEST_F(levin_notify, forward_without_padding) { - cryptonote::levin::notify notifier = make_notifier(0, true, false); + std::shared_ptr notifier_ptr = make_notifier(0, true, false); + auto ¬ifier = *notifier_ptr; for (unsigned count = 0; count < 10; ++count) add_connection(count % 2 == 0); @@ -834,7 +850,8 @@ TEST_F(levin_notify, forward_without_padding) TEST_F(levin_notify, block_without_padding) { - cryptonote::levin::notify notifier = make_notifier(0, true, false); + std::shared_ptr notifier_ptr = make_notifier(0, true, false); + auto ¬ifier = *notifier_ptr; for (unsigned count = 0; count < 10; ++count) add_connection(count % 2 == 0); @@ -863,7 +880,8 @@ TEST_F(levin_notify, block_without_padding) TEST_F(levin_notify, none_without_padding) { - cryptonote::levin::notify notifier = make_notifier(0, true, false); + std::shared_ptr notifier_ptr = make_notifier(0, true, false); + auto ¬ifier = *notifier_ptr; for (unsigned count = 0; count < 10; ++count) add_connection(count % 2 == 0); @@ -892,7 +910,8 @@ TEST_F(levin_notify, none_without_padding) TEST_F(levin_notify, fluff_with_padding) { - cryptonote::levin::notify notifier = make_notifier(0, true, true); + std::shared_ptr notifier_ptr = make_notifier(0, true, true); + auto ¬ifier = *notifier_ptr; for (unsigned count = 0; count < 10; ++count) add_connection(count % 2 == 0); @@ -938,7 +957,8 @@ TEST_F(levin_notify, fluff_with_padding) TEST_F(levin_notify, stem_with_padding) { - cryptonote::levin::notify notifier = make_notifier(0, true, true); + std::shared_ptr notifier_ptr = make_notifier(0, true, true); + auto ¬ifier = *notifier_ptr; for (unsigned count = 0; count < 10; ++count) add_connection(count % 2 == 0); @@ -1005,7 +1025,8 @@ TEST_F(levin_notify, stem_with_padding) TEST_F(levin_notify, stem_no_outs_with_padding) { - cryptonote::levin::notify notifier = make_notifier(0, true, true); + std::shared_ptr notifier_ptr = make_notifier(0, true, true); + auto ¬ifier = *notifier_ptr; for (unsigned count = 0; count < 10; ++count) add_connection(true); @@ -1059,7 +1080,8 @@ TEST_F(levin_notify, stem_no_outs_with_padding) TEST_F(levin_notify, local_with_padding) { - cryptonote::levin::notify notifier = make_notifier(0, true, true); + std::shared_ptr notifier_ptr = make_notifier(0, true, true); + auto ¬ifier = *notifier_ptr; for (unsigned count = 0; count < 10; ++count) add_connection(count % 2 == 0); @@ -1126,7 +1148,8 @@ TEST_F(levin_notify, local_with_padding) TEST_F(levin_notify, forward_with_padding) { - cryptonote::levin::notify notifier = make_notifier(0, true, true); + std::shared_ptr notifier_ptr = make_notifier(0, true, true); + auto ¬ifier = *notifier_ptr; for (unsigned count = 0; count < 10; ++count) add_connection(count % 2 == 0); @@ -1193,7 +1216,8 @@ TEST_F(levin_notify, forward_with_padding) TEST_F(levin_notify, block_with_padding) { - cryptonote::levin::notify notifier = make_notifier(0, true, true); + std::shared_ptr notifier_ptr = make_notifier(0, true, true); + auto ¬ifier = *notifier_ptr; for (unsigned count = 0; count < 10; ++count) add_connection(count % 2 == 0); @@ -1222,7 +1246,8 @@ TEST_F(levin_notify, block_with_padding) TEST_F(levin_notify, none_with_padding) { - cryptonote::levin::notify notifier = make_notifier(0, true, true); + std::shared_ptr notifier_ptr = make_notifier(0, true, true); + auto ¬ifier = *notifier_ptr; for (unsigned count = 0; count < 10; ++count) add_connection(count % 2 == 0); @@ -1251,7 +1276,8 @@ TEST_F(levin_notify, none_with_padding) TEST_F(levin_notify, private_fluff_without_padding) { - cryptonote::levin::notify notifier = make_notifier(0, false, false); + std::shared_ptr notifier_ptr = make_notifier(0, false, false); + auto ¬ifier = *notifier_ptr; for (unsigned count = 0; count < 10; ++count) add_connection(count % 2 == 0); @@ -1302,7 +1328,8 @@ TEST_F(levin_notify, private_fluff_without_padding) TEST_F(levin_notify, private_stem_without_padding) { // private mode always uses fluff but marked as stem - cryptonote::levin::notify notifier = make_notifier(0, false, false); + std::shared_ptr notifier_ptr = make_notifier(0, false, false); + auto ¬ifier = *notifier_ptr; for (unsigned count = 0; count < 10; ++count) add_connection(count % 2 == 0); @@ -1353,7 +1380,8 @@ TEST_F(levin_notify, private_stem_without_padding) TEST_F(levin_notify, private_local_without_padding) { // private mode always uses fluff but marked as stem - cryptonote::levin::notify notifier = make_notifier(0, false, false); + std::shared_ptr notifier_ptr = make_notifier(0, false, false); + auto ¬ifier = *notifier_ptr; for (unsigned count = 0; count < 10; ++count) add_connection(count % 2 == 0); @@ -1404,7 +1432,8 @@ TEST_F(levin_notify, private_local_without_padding) TEST_F(levin_notify, private_forward_without_padding) { // private mode always uses fluff but marked as stem - cryptonote::levin::notify notifier = make_notifier(0, false, false); + std::shared_ptr notifier_ptr = make_notifier(0, false, false); + auto ¬ifier = *notifier_ptr; for (unsigned count = 0; count < 10; ++count) add_connection(count % 2 == 0); @@ -1455,7 +1484,8 @@ TEST_F(levin_notify, private_forward_without_padding) TEST_F(levin_notify, private_block_without_padding) { // private mode always uses fluff but marked as stem - cryptonote::levin::notify notifier = make_notifier(0, false, false); + std::shared_ptr notifier_ptr = make_notifier(0, false, false); + auto ¬ifier = *notifier_ptr; for (unsigned count = 0; count < 10; ++count) add_connection(count % 2 == 0); @@ -1485,7 +1515,8 @@ TEST_F(levin_notify, private_block_without_padding) TEST_F(levin_notify, private_none_without_padding) { // private mode always uses fluff but marked as stem - cryptonote::levin::notify notifier = make_notifier(0, false, false); + std::shared_ptr notifier_ptr = make_notifier(0, false, false); + auto ¬ifier = *notifier_ptr; for (unsigned count = 0; count < 10; ++count) add_connection(count % 2 == 0); @@ -1514,7 +1545,8 @@ TEST_F(levin_notify, private_none_without_padding) TEST_F(levin_notify, private_fluff_with_padding) { - cryptonote::levin::notify notifier = make_notifier(0, false, true); + std::shared_ptr notifier_ptr = make_notifier(0, false, true); + auto ¬ifier = *notifier_ptr; for (unsigned count = 0; count < 10; ++count) add_connection(count % 2 == 0); @@ -1564,7 +1596,8 @@ TEST_F(levin_notify, private_fluff_with_padding) TEST_F(levin_notify, private_stem_with_padding) { - cryptonote::levin::notify notifier = make_notifier(0, false, true); + std::shared_ptr notifier_ptr = make_notifier(0, false, true); + auto ¬ifier = *notifier_ptr; for (unsigned count = 0; count < 10; ++count) add_connection(count % 2 == 0); @@ -1614,7 +1647,8 @@ TEST_F(levin_notify, private_stem_with_padding) TEST_F(levin_notify, private_local_with_padding) { - cryptonote::levin::notify notifier = make_notifier(0, false, true); + std::shared_ptr notifier_ptr = make_notifier(0, false, true); + auto ¬ifier = *notifier_ptr; for (unsigned count = 0; count < 10; ++count) add_connection(count % 2 == 0); @@ -1664,7 +1698,8 @@ TEST_F(levin_notify, private_local_with_padding) TEST_F(levin_notify, private_forward_with_padding) { - cryptonote::levin::notify notifier = make_notifier(0, false, true); + std::shared_ptr notifier_ptr = make_notifier(0, false, true); + auto ¬ifier = *notifier_ptr; for (unsigned count = 0; count < 10; ++count) add_connection(count % 2 == 0); @@ -1714,7 +1749,8 @@ TEST_F(levin_notify, private_forward_with_padding) TEST_F(levin_notify, private_block_with_padding) { - cryptonote::levin::notify notifier = make_notifier(0, false, true); + std::shared_ptr notifier_ptr = make_notifier(0, false, true); + auto ¬ifier = *notifier_ptr; for (unsigned count = 0; count < 10; ++count) add_connection(count % 2 == 0); @@ -1743,7 +1779,8 @@ TEST_F(levin_notify, private_block_with_padding) TEST_F(levin_notify, private_none_with_padding) { - cryptonote::levin::notify notifier = make_notifier(0, false, true); + std::shared_ptr notifier_ptr = make_notifier(0, false, true); + auto ¬ifier = *notifier_ptr; for (unsigned count = 0; count < 10; ++count) add_connection(count % 2 == 0); @@ -1774,7 +1811,8 @@ TEST_F(levin_notify, stem_mappings) { static constexpr const unsigned test_connections_count = (CRYPTONOTE_DANDELIONPP_STEMS + 1) * 2; - cryptonote::levin::notify notifier = make_notifier(0, true, false); + std::shared_ptr notifier_ptr = make_notifier(0, true, false); + auto ¬ifier = *notifier_ptr; for (unsigned count = 0; count < test_connections_count; ++count) add_connection(count % 2 == 0); @@ -1897,7 +1935,8 @@ TEST_F(levin_notify, fluff_multiple) { static constexpr const unsigned test_connections_count = (CRYPTONOTE_DANDELIONPP_STEMS + 1) * 2; - cryptonote::levin::notify notifier = make_notifier(0, true, false); + std::shared_ptr notifier_ptr = make_notifier(0, true, false); + auto ¬ifier = *notifier_ptr; for (unsigned count = 0; count < test_connections_count; ++count) add_connection(count % 2 == 0); @@ -2015,7 +2054,8 @@ TEST_F(levin_notify, noise) txs[0].resize(1900, 'h'); const boost::uuids::uuid incoming_id = random_generator_(); - cryptonote::levin::notify notifier = make_notifier(2048, false, true); + std::shared_ptr notifier_ptr = make_notifier(2048, false, true); + auto ¬ifier = *notifier_ptr; { const auto status = notifier.get_status(); @@ -2106,7 +2146,8 @@ TEST_F(levin_notify, noise_stem) txs[0].resize(1900, 'h'); const boost::uuids::uuid incoming_id = random_generator_(); - cryptonote::levin::notify notifier = make_notifier(2048, false, true); + std::shared_ptr notifier_ptr = make_notifier(2048, false, true); + auto ¬ifier = *notifier_ptr; { const auto status = notifier.get_status();