Dandelion++: skip desynced peers in stem phase

This commit is contained in:
xiphon 2020-10-12 12:32:17 +00:00
parent 3cbb44a2fd
commit a12a8174e0
9 changed files with 78 additions and 64 deletions

View file

@ -2538,7 +2538,7 @@ skip:
local mempool before doing the relay. The code was already updating the
DB twice on received transactions - it is difficult to workaround this
due to the internal design. */
return m_p2p->send_txs(std::move(arg.txs), zone, source, m_core, tx_relay) != epee::net_utils::zone::invalid;
return m_p2p->send_txs(std::move(arg.txs), zone, source, tx_relay) != epee::net_utils::zone::invalid;
}
//------------------------------------------------------------------------------------------------------------------------
template<class t_core>

View file

@ -105,8 +105,8 @@ namespace levin
return std::chrono::steady_clock::duration{crypto::rand_range(rep(0), range.count())};
}
//! \return All outgoing connections supporting fragments in `connections`.
std::vector<boost::uuids::uuid> get_out_connections(connections& p2p)
//! \return Outgoing connections supporting fragments in `connections` filtered by remote blockchain height.
std::vector<boost::uuids::uuid> get_out_connections(connections& p2p, uint64_t min_blockchain_height)
{
std::vector<boost::uuids::uuid> outs;
outs.reserve(connection_id_reserve_size);
@ -115,8 +115,8 @@ namespace levin
the reserve call so a strand is not used. Investigate if there is lots
of waiting in here. */
p2p.foreach_connection([&outs] (detail::p2p_context& context) {
if (!context.m_is_income)
p2p.foreach_connection([&outs, min_blockchain_height] (detail::p2p_context& context) {
if (!context.m_is_income && context.m_remote_blockchain_height >= min_blockchain_height)
outs.emplace_back(context.m_connection_id);
return true;
});
@ -544,7 +544,7 @@ namespace levin
}
// connection list may be outdated, try again
update_channels::run(zone_, get_out_connections(*zone_->p2p));
update_channels::run(zone_, get_out_connections(*zone_->p2p, core_->get_target_blockchain_height()));
}
MERROR("Unable to send transaction(s) via Dandelion++ stem");
@ -591,8 +591,9 @@ namespace levin
{
std::shared_ptr<detail::zone> zone_;
const std::size_t channel_;
const i_core_events* core_;
static void wait(const std::chrono::steady_clock::time_point start, std::shared_ptr<detail::zone> zone, const std::size_t index)
static void wait(const std::chrono::steady_clock::time_point start, std::shared_ptr<detail::zone> zone, const std::size_t index, const i_core_events* core)
{
if (!zone)
return;
@ -600,7 +601,7 @@ namespace levin
noise_channel& channel = zone->channels.at(index);
channel.next_noise.expires_at(start + noise_min_delay + random_duration(noise_delay_range));
channel.next_noise.async_wait(
channel.strand.wrap(send_noise{std::move(zone), index})
channel.strand.wrap(send_noise{std::move(zone), index, core})
);
}
@ -645,7 +646,7 @@ namespace levin
channel.active = nullptr;
channel.connection = boost::uuids::nil_uuid();
auto connections = get_out_connections(*zone_->p2p);
auto connections = get_out_connections(*zone_->p2p, core_->get_target_blockchain_height());
if (connections.empty())
MWARNING("Lost all outbound connections to anonymity network - currently unable to send transaction(s)");
@ -653,7 +654,7 @@ namespace levin
}
}
wait(start, std::move(zone_), channel_);
wait(start, std::move(zone_), channel_, core_);
}
};
@ -665,6 +666,7 @@ namespace levin
std::chrono::seconds min_epoch_;
std::chrono::seconds epoch_range_;
std::size_t count_;
const i_core_events* core_;
//! \pre Should not be invoked within any strand to prevent blocking.
void operator()(const boost::system::error_code error = {})
@ -677,8 +679,9 @@ namespace levin
const bool fluffing = crypto::rand_idx(unsigned(100)) < CRYPTONOTE_DANDELIONPP_FLUFF_PROBABILITY;
const auto start = std::chrono::steady_clock::now();
auto connections = get_out_connections(*(zone_->p2p), core_->get_target_blockchain_height());
zone_->strand.dispatch(
change_channels{zone_, net::dandelionpp::connection_map{get_out_connections(*(zone_->p2p)), count_}, fluffing}
change_channels{zone_, net::dandelionpp::connection_map{std::move(connections), count_}, fluffing}
);
detail::zone& alias = *zone_;
@ -688,8 +691,9 @@ namespace levin
};
} // anonymous
notify::notify(boost::asio::io_service& service, std::shared_ptr<connections> p2p, epee::byte_slice noise, const bool is_public, const bool pad_txs)
notify::notify(boost::asio::io_service& service, std::shared_ptr<connections> p2p, epee::byte_slice noise, const bool is_public, const bool pad_txs, i_core_events& core)
: zone_(std::make_shared<detail::zone>(service, std::move(p2p), std::move(noise), is_public, pad_txs))
, core_(std::addressof(core))
{
if (!zone_->p2p)
throw std::logic_error{"cryptonote::levin::notify cannot have nullptr p2p argument"};
@ -702,10 +706,10 @@ namespace levin
const auto epoch_range = noise_enabled ? noise_epoch_range : dandelionpp_epoch_range;
const std::size_t out_count = noise_enabled ? CRYPTONOTE_NOISE_CHANNELS : CRYPTONOTE_DANDELIONPP_STEMS;
start_epoch{zone_, min_epoch, epoch_range, out_count}();
start_epoch{zone_, min_epoch, epoch_range, out_count, core_}();
for (std::size_t channel = 0; channel < zone_->channels.size(); ++channel)
send_noise::wait(now, zone_, channel);
send_noise::wait(now, zone_, channel, core_);
}
}
@ -726,7 +730,7 @@ namespace levin
return;
zone_->strand.dispatch(
update_channels{zone_, get_out_connections(*(zone_->p2p))}
update_channels{zone_, get_out_connections(*(zone_->p2p), core_->get_target_blockchain_height())}
);
}
@ -753,7 +757,7 @@ namespace levin
zone_->flush_txs.cancel();
}
bool notify::send_txs(std::vector<blobdata> txs, const boost::uuids::uuid& source, i_core_events& core, relay_method tx_relay)
bool notify::send_txs(std::vector<blobdata> txs, const boost::uuids::uuid& source, relay_method tx_relay)
{
if (txs.empty())
return true;
@ -785,7 +789,7 @@ namespace levin
tx_relay = relay_method::local; // do not put into stempool embargo (hopefully not there already!).
}
core.on_transactions_relayed(epee::to_span(txs), tx_relay);
core_->on_transactions_relayed(epee::to_span(txs), tx_relay);
// Padding is not useful when using noise mode. Send as stem so receiver
// forwards in Dandelion++ mode.
@ -821,7 +825,7 @@ namespace levin
{
// this will change a local/forward tx to stem or fluff ...
zone_->strand.dispatch(
dandelionpp_notify{zone_, std::addressof(core), std::move(txs), source}
dandelionpp_notify{zone_, core_, std::move(txs), source}
);
break;
}
@ -832,7 +836,7 @@ namespace levin
routine. A "fluff" over i2p/tor is not the same as a "fluff" over
ipv4/6. Marking it as "fluff" here will make the tx immediately
visible externally from this node, which is not desired. */
core.on_transactions_relayed(epee::to_span(txs), tx_relay);
core_->on_transactions_relayed(epee::to_span(txs), tx_relay);
zone_->strand.dispatch(fluff_notify{zone_, std::move(txs), source});
break;
}

View file

@ -69,6 +69,7 @@ namespace levin
class notify
{
std::shared_ptr<detail::zone> zone_;
i_core_events* core_;
public:
struct status
@ -80,10 +81,11 @@ namespace levin
//! Construct an instance that cannot notify.
notify() noexcept
: zone_(nullptr)
, core_(nullptr)
{}
//! Construct an instance with available notification `zones`.
explicit notify(boost::asio::io_service& service, std::shared_ptr<connections> p2p, epee::byte_slice noise, bool is_public, bool pad_txs);
explicit notify(boost::asio::io_service& service, std::shared_ptr<connections> p2p, epee::byte_slice noise, bool is_public, bool pad_txs, i_core_events& core);
notify(const notify&) = delete;
notify(notify&&) = default;
@ -123,7 +125,7 @@ namespace levin
particular stem.
\return True iff the notification is queued for sending. */
bool send_txs(std::vector<blobdata> txs, const boost::uuids::uuid& source, i_core_events& core, relay_method tx_relay);
bool send_txs(std::vector<blobdata> txs, const boost::uuids::uuid& source, relay_method tx_relay);
};
} // levin
} // net