Use a threadpool

Instead of constantly creating and destroying threads
This commit is contained in:
Howard Chu 2017-09-14 04:39:37 +01:00
parent 7abdba0a5c
commit 510d0d4753
No known key found for this signature in database
GPG key ID: FD2A70B44AB11BA7
15 changed files with 302 additions and 963 deletions

View file

@ -45,6 +45,7 @@
#include "profile_tools.h"
#include "file_io_utils.h"
#include "common/int-util.h"
#include "common/threadpool.h"
#include "common/boost_serialization_helper.h"
#include "warnings.h"
#include "crypto/hash.h"
@ -2513,33 +2514,9 @@ bool Blockchain::check_tx_inputs(transaction& tx, tx_verification_context &tvc,
std::vector < uint64_t > results;
results.resize(tx.vin.size(), 0);
int threads = tools::get_max_concurrency();
boost::asio::io_service ioservice;
boost::thread_group threadpool;
bool ioservice_active = false;
std::unique_ptr < boost::asio::io_service::work > work(new boost::asio::io_service::work(ioservice));
if(threads > 1)
{
for (int i = 0; i < threads; i++)
{
threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioservice));
}
ioservice_active = true;
}
#define KILL_IOSERVICE() \
if(ioservice_active) \
{ \
work.reset(); \
while (!ioservice.stopped()) ioservice.poll(); \
threadpool.join_all(); \
ioservice.stop(); \
ioservice_active = false; \
}
epee::misc_utils::auto_scope_leave_caller ioservice_killer = epee::misc_utils::create_scope_leave_handler([&]() { KILL_IOSERVICE(); });
tools::threadpool& tpool = tools::threadpool::getInstance();
tools::threadpool::waiter waiter;
int threads = tpool.get_max_concurrency();
for (const auto& txin : tx.vin)
{
@ -2600,7 +2577,7 @@ bool Blockchain::check_tx_inputs(transaction& tx, tx_verification_context &tvc,
{
// ND: Speedup
// 1. Thread ring signature verification if possible.
ioservice.dispatch(boost::bind(&Blockchain::check_ring_signature, this, std::cref(tx_prefix_hash), std::cref(in_to_key.k_image), std::cref(pubkeys[sig_index]), std::cref(tx.signatures[sig_index]), std::ref(results[sig_index])));
tpool.submit(&waiter, boost::bind(&Blockchain::check_ring_signature, this, std::cref(tx_prefix_hash), std::cref(in_to_key.k_image), std::cref(pubkeys[sig_index]), std::cref(tx.signatures[sig_index]), std::ref(results[sig_index])));
}
else
{
@ -2623,8 +2600,8 @@ bool Blockchain::check_tx_inputs(transaction& tx, tx_verification_context &tvc,
sig_index++;
}
KILL_IOSERVICE();
if (tx.version == 1 && threads > 1)
waiter.wait();
if (tx.version == 1)
{
@ -3699,7 +3676,8 @@ bool Blockchain::prepare_handle_incoming_blocks(const std::list<block_complete_e
return true;
bool blocks_exist = false;
uint64_t threads = tools::get_max_concurrency();
tools::threadpool& tpool = tools::threadpool::getInstance();
uint64_t threads = tpool.get_max_concurrency();
if (blocks_entry.size() > 1 && threads > 1 && m_max_prepare_blocks_threads > 1)
{
@ -3708,15 +3686,12 @@ bool Blockchain::prepare_handle_incoming_blocks(const std::list<block_complete_e
threads = m_max_prepare_blocks_threads;
uint64_t height = m_db->height();
std::vector<boost::thread *> thread_list;
int batches = blocks_entry.size() / threads;
int extra = blocks_entry.size() % threads;
MDEBUG("block_batches: " << batches);
std::vector<std::unordered_map<crypto::hash, crypto::hash>> maps(threads);
std::vector < std::vector < block >> blocks(threads);
auto it = blocks_entry.begin();
boost::thread::attributes attrs;
attrs.set_stack_size(THREAD_STACK_SIZE);
for (uint64_t i = 0; i < threads; i++)
{
@ -3775,19 +3750,14 @@ bool Blockchain::prepare_handle_incoming_blocks(const std::list<block_complete_e
{
m_blocks_longhash_table.clear();
uint64_t thread_height = height;
tools::threadpool::waiter waiter;
for (uint64_t i = 0; i < threads; i++)
{
thread_list.push_back(new boost::thread(attrs, boost::bind(&Blockchain::block_longhash_worker, this, thread_height, std::cref(blocks[i]), std::ref(maps[i]))));
tpool.submit(&waiter, boost::bind(&Blockchain::block_longhash_worker, this, thread_height, std::cref(blocks[i]), std::ref(maps[i])));
thread_height += blocks[i].size();
}
for (size_t j = 0; j < thread_list.size(); j++)
{
thread_list[j]->join();
delete thread_list[j];
}
thread_list.clear();
waiter.wait();
if (m_cancel)
return false;
@ -3911,30 +3881,20 @@ bool Blockchain::prepare_handle_incoming_blocks(const std::list<block_complete_e
// [output] stores all transactions for each tx_out_index::hash found
std::vector<std::unordered_map<crypto::hash, cryptonote::transaction>> transactions(amounts.size());
threads = tools::get_max_concurrency();
threads = tpool.get_max_concurrency();
if (!m_db->can_thread_bulk_indices())
threads = 1;
if (threads > 1)
{
boost::asio::io_service ioservice;
boost::thread_group threadpool;
std::unique_ptr < boost::asio::io_service::work > work(new boost::asio::io_service::work(ioservice));
for (uint64_t i = 0; i < threads; i++)
{
threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioservice));
}
tools::threadpool::waiter waiter;
for (size_t i = 0; i < amounts.size(); i++)
{
uint64_t amount = amounts[i];
ioservice.dispatch(boost::bind(&Blockchain::output_scan_worker, this, amount, std::cref(offset_map[amount]), std::ref(tx_map[amount]), std::ref(transactions[i])));
tpool.submit(&waiter, boost::bind(&Blockchain::output_scan_worker, this, amount, std::cref(offset_map[amount]), std::ref(tx_map[amount]), std::ref(transactions[i])));
}
work.reset();
threadpool.join_all();
ioservice.stop();
waiter.wait();
}
else
{