mirror of
https://github.com/monero-project/monero.git
synced 2025-08-09 16:22:23 -04:00
Merge pull request #8427
1fc60ca
Publish submitted txs via zmq (j-berman)
This commit is contained in:
commit
ce80747c58
10 changed files with 142 additions and 11 deletions
|
@ -1406,21 +1406,66 @@ namespace cryptonote
|
|||
return true;
|
||||
}
|
||||
//-----------------------------------------------------------------------------------------------
|
||||
bool core::notify_txpool_event(const epee::span<const cryptonote::blobdata> tx_blobs, epee::span<const crypto::hash> tx_hashes, epee::span<const cryptonote::transaction> txs, const std::vector<bool> &just_broadcasted) const
|
||||
{
|
||||
if (!m_zmq_pub)
|
||||
return true;
|
||||
|
||||
if (tx_blobs.size() != tx_hashes.size() || tx_blobs.size() != txs.size() || tx_blobs.size() != just_broadcasted.size())
|
||||
return false;
|
||||
|
||||
/* Publish txs via ZMQ that are "just broadcasted" by the daemon. This is
|
||||
done here in addition to `handle_incoming_txs` in order to guarantee txs
|
||||
are pub'd via ZMQ when we know the daemon has/will broadcast to other
|
||||
nodes & *after* the tx is visible in the pool. This should get called
|
||||
when the user submits a tx to a daemon in the "fluff" epoch relaying txs
|
||||
via a public network. */
|
||||
if (std::count(just_broadcasted.begin(), just_broadcasted.end(), true) == 0)
|
||||
return true;
|
||||
|
||||
std::vector<txpool_event> results{};
|
||||
results.resize(tx_blobs.size());
|
||||
for (std::size_t i = 0; i < results.size(); ++i)
|
||||
{
|
||||
results[i].tx = std::move(txs[i]);
|
||||
results[i].hash = std::move(tx_hashes[i]);
|
||||
results[i].blob_size = tx_blobs[i].size();
|
||||
results[i].weight = results[i].tx.pruned ? get_pruned_transaction_weight(results[i].tx) : get_transaction_weight(results[i].tx, results[i].blob_size);
|
||||
results[i].res = just_broadcasted[i];
|
||||
}
|
||||
|
||||
m_zmq_pub(std::move(results));
|
||||
|
||||
return true;
|
||||
}
|
||||
//-----------------------------------------------------------------------------------------------
|
||||
void core::on_transactions_relayed(const epee::span<const cryptonote::blobdata> tx_blobs, const relay_method tx_relay)
|
||||
{
|
||||
// lock ensures duplicate txs aren't pub'd via zmq
|
||||
CRITICAL_REGION_LOCAL(m_incoming_tx_lock);
|
||||
|
||||
std::vector<crypto::hash> tx_hashes{};
|
||||
tx_hashes.resize(tx_blobs.size());
|
||||
|
||||
std::vector<cryptonote::transaction> txs{};
|
||||
txs.resize(tx_blobs.size());
|
||||
|
||||
for (std::size_t i = 0; i < tx_blobs.size(); ++i)
|
||||
{
|
||||
cryptonote::transaction tx{};
|
||||
if (!parse_and_validate_tx_from_blob(tx_blobs[i], tx, tx_hashes[i]))
|
||||
if (!parse_and_validate_tx_from_blob(tx_blobs[i], txs[i], tx_hashes[i]))
|
||||
{
|
||||
LOG_ERROR("Failed to parse relayed transaction");
|
||||
return;
|
||||
}
|
||||
}
|
||||
m_mempool.set_relayed(epee::to_span(tx_hashes), tx_relay);
|
||||
|
||||
std::vector<bool> just_broadcasted{};
|
||||
just_broadcasted.reserve(tx_hashes.size());
|
||||
|
||||
m_mempool.set_relayed(epee::to_span(tx_hashes), tx_relay, just_broadcasted);
|
||||
|
||||
if (m_zmq_pub && matches_category(tx_relay, relay_category::legacy))
|
||||
notify_txpool_event(tx_blobs, epee::to_span(tx_hashes), epee::to_span(txs), just_broadcasted);
|
||||
}
|
||||
//-----------------------------------------------------------------------------------------------
|
||||
bool core::get_block_template(block& b, const account_public_address& adr, difficulty_type& diffic, uint64_t& height, uint64_t& expected_reward, const blobdata& ex_nonce, uint64_t &seed_height, crypto::hash &seed_hash)
|
||||
|
|
|
@ -1035,6 +1035,13 @@ namespace cryptonote
|
|||
*/
|
||||
bool relay_txpool_transactions();
|
||||
|
||||
/**
|
||||
* @brief sends notification of txpool events to subscribers
|
||||
*
|
||||
* @return true on success, false otherwise
|
||||
*/
|
||||
bool notify_txpool_event(const epee::span<const cryptonote::blobdata> tx_blobs, epee::span<const crypto::hash> tx_hashes, epee::span<const cryptonote::transaction> txs, const std::vector<bool> &just_broadcasted) const;
|
||||
|
||||
/**
|
||||
* @brief checks DNS versions
|
||||
*
|
||||
|
|
|
@ -839,8 +839,10 @@ namespace cryptonote
|
|||
return true;
|
||||
}
|
||||
//---------------------------------------------------------------------------------
|
||||
void tx_memory_pool::set_relayed(const epee::span<const crypto::hash> hashes, const relay_method method)
|
||||
void tx_memory_pool::set_relayed(const epee::span<const crypto::hash> hashes, const relay_method method, std::vector<bool> &just_broadcasted)
|
||||
{
|
||||
just_broadcasted.clear();
|
||||
|
||||
crypto::random_poisson_seconds embargo_duration{dandelionpp_embargo_average};
|
||||
const auto now = std::chrono::system_clock::now();
|
||||
uint64_t next_relay = uint64_t{std::numeric_limits<time_t>::max()};
|
||||
|
@ -850,12 +852,14 @@ namespace cryptonote
|
|||
LockedTXN lock(m_blockchain.get_db());
|
||||
for (const auto& hash : hashes)
|
||||
{
|
||||
bool was_just_broadcasted = false;
|
||||
try
|
||||
{
|
||||
txpool_tx_meta_t meta;
|
||||
if (m_blockchain.get_txpool_tx_meta(hash, meta))
|
||||
{
|
||||
// txes can be received as "stem" or "fluff" in either order
|
||||
const bool already_broadcasted = meta.matches(relay_category::broadcasted);
|
||||
meta.upgrade_relay_method(method);
|
||||
meta.relayed = true;
|
||||
|
||||
|
@ -868,6 +872,9 @@ namespace cryptonote
|
|||
meta.last_relayed_time = std::chrono::system_clock::to_time_t(now);
|
||||
|
||||
m_blockchain.update_txpool_tx(hash, meta);
|
||||
|
||||
// wait until db update succeeds to ensure tx is visible in the pool
|
||||
was_just_broadcasted = !already_broadcasted && meta.matches(relay_category::broadcasted);
|
||||
}
|
||||
}
|
||||
catch (const std::exception &e)
|
||||
|
@ -875,6 +882,7 @@ namespace cryptonote
|
|||
MERROR("Failed to update txpool transaction metadata: " << e.what());
|
||||
// continue
|
||||
}
|
||||
just_broadcasted.emplace_back(was_just_broadcasted);
|
||||
}
|
||||
lock.commit();
|
||||
set_if_less(m_next_check, time_t(next_relay));
|
||||
|
|
|
@ -357,8 +357,10 @@ namespace cryptonote
|
|||
*
|
||||
* @param hashes list of tx hashes that are about to be relayed
|
||||
* @param tx_relay update how the tx left this node
|
||||
* @param just_broadcasted true if a tx was just broadcasted
|
||||
*
|
||||
*/
|
||||
void set_relayed(epee::span<const crypto::hash> hashes, relay_method tx_relay);
|
||||
void set_relayed(epee::span<const crypto::hash> hashes, relay_method tx_relay, std::vector<bool> &just_broadcasted);
|
||||
|
||||
/**
|
||||
* @brief get the total number of transactions in the pool
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue