threadpool: allow leaf functions to run concurrently

Decrease the number of worker threads by one to account
for the fact the calling thread acts as a worker thread now
This commit is contained in:
moneromooo-monero 2018-04-26 11:44:47 +01:00
parent 2704624eae
commit 2771a18e85
No known key found for this signature in database
GPG key ID: 686F07454D6CEFC3
7 changed files with 112 additions and 51 deletions

View file

@ -36,6 +36,7 @@
#include "common/util.h"
static __thread int depth = 0;
static __thread bool is_leaf = false;
namespace tools
{
@ -43,9 +44,9 @@ threadpool::threadpool(unsigned int max_threads) : running(true), active(0) {
boost::thread::attributes attrs;
attrs.set_stack_size(THREAD_STACK_SIZE);
max = max_threads ? max_threads : tools::get_max_concurrency();
unsigned int i = max;
size_t i = max ? max - 1 : 0;
while(i--) {
threads.push_back(boost::thread(attrs, boost::bind(&threadpool::run, this)));
threads.push_back(boost::thread(attrs, boost::bind(&threadpool::run, this, false)));
}
}
@ -60,20 +61,25 @@ threadpool::~threadpool() {
}
}
void threadpool::submit(waiter *obj, std::function<void()> f) {
entry e = {obj, f};
void threadpool::submit(waiter *obj, std::function<void()> f, bool leaf) {
CHECK_AND_ASSERT_THROW_MES(!is_leaf, "A leaf routine is using a thread pool");
boost::unique_lock<boost::mutex> lock(mutex);
if ((active == max && !queue.empty()) || depth > 0) {
if (!leaf && ((active == max && !queue.empty()) || depth > 0)) {
// if all available threads are already running
// and there's work waiting, just run in current thread
lock.unlock();
++depth;
is_leaf = leaf;
f();
--depth;
is_leaf = false;
} else {
if (obj)
obj->inc();
queue.push_back(e);
if (leaf)
queue.push_front({obj, f, leaf});
else
queue.push_back({obj, f, leaf});
has_work.notify_one();
}
}
@ -91,7 +97,7 @@ threadpool::waiter::~waiter()
}
try
{
wait();
wait(NULL);
}
catch (const std::exception &e)
{
@ -99,9 +105,12 @@ threadpool::waiter::~waiter()
}
}
void threadpool::waiter::wait() {
void threadpool::waiter::wait(threadpool *tpool) {
if (tpool)
tpool->run(true);
boost::unique_lock<boost::mutex> lock(mt);
while(num) cv.wait(lock);
while(num)
cv.wait(lock);
}
void threadpool::waiter::inc() {
@ -113,15 +122,19 @@ void threadpool::waiter::dec() {
const boost::unique_lock<boost::mutex> lock(mt);
num--;
if (!num)
cv.notify_one();
cv.notify_all();
}
void threadpool::run() {
void threadpool::run(bool flush) {
boost::unique_lock<boost::mutex> lock(mutex);
while (running) {
entry e;
while(queue.empty() && running)
{
if (flush)
return;
has_work.wait(lock);
}
if (!running) break;
active++;
@ -129,8 +142,10 @@ void threadpool::run() {
queue.pop_front();
lock.unlock();
++depth;
is_leaf = e.leaf;
e.f();
--depth;
is_leaf = false;
if (e.wo)
e.wo->dec();