luigi1111 2018-06-27 15:33:01 -05:00
commit a844844cda
No known key found for this signature in database
GPG key ID: F4ACA0183641E010
32 changed files with 763 additions and 496 deletions

View file

@ -36,6 +36,7 @@
#include "common/util.h"
static __thread int depth = 0;
static __thread bool is_leaf = false;
namespace tools
{
@ -43,9 +44,9 @@ threadpool::threadpool(unsigned int max_threads) : running(true), active(0) {
boost::thread::attributes attrs;
attrs.set_stack_size(THREAD_STACK_SIZE);
max = max_threads ? max_threads : tools::get_max_concurrency();
unsigned int i = max;
size_t i = max ? max - 1 : 0;
while(i--) {
threads.push_back(boost::thread(attrs, boost::bind(&threadpool::run, this)));
threads.push_back(boost::thread(attrs, boost::bind(&threadpool::run, this, false)));
}
}
@ -60,20 +61,25 @@ threadpool::~threadpool() {
}
}
void threadpool::submit(waiter *obj, std::function<void()> f) {
entry e = {obj, f};
void threadpool::submit(waiter *obj, std::function<void()> f, bool leaf) {
CHECK_AND_ASSERT_THROW_MES(!is_leaf, "A leaf routine is using a thread pool");
boost::unique_lock<boost::mutex> lock(mutex);
if ((active == max && !queue.empty()) || depth > 0) {
if (!leaf && ((active == max && !queue.empty()) || depth > 0)) {
// if all available threads are already running
// and there's work waiting, just run in current thread
lock.unlock();
++depth;
is_leaf = leaf;
f();
--depth;
is_leaf = false;
} else {
if (obj)
obj->inc();
queue.push_back(e);
if (leaf)
queue.push_front({obj, f, leaf});
else
queue.push_back({obj, f, leaf});
has_work.notify_one();
}
}
@ -91,7 +97,7 @@ threadpool::waiter::~waiter()
}
try
{
wait();
wait(NULL);
}
catch (const std::exception &e)
{
@ -99,9 +105,12 @@ threadpool::waiter::~waiter()
}
}
void threadpool::waiter::wait() {
void threadpool::waiter::wait(threadpool *tpool) {
if (tpool)
tpool->run(true);
boost::unique_lock<boost::mutex> lock(mt);
while(num) cv.wait(lock);
while(num)
cv.wait(lock);
}
void threadpool::waiter::inc() {
@ -113,15 +122,19 @@ void threadpool::waiter::dec() {
const boost::unique_lock<boost::mutex> lock(mt);
num--;
if (!num)
cv.notify_one();
cv.notify_all();
}
void threadpool::run() {
void threadpool::run(bool flush) {
boost::unique_lock<boost::mutex> lock(mutex);
while (running) {
entry e;
while(queue.empty() && running)
{
if (flush)
return;
has_work.wait(lock);
}
if (!running) break;
active++;
@ -129,8 +142,10 @@ void threadpool::run() {
queue.pop_front();
lock.unlock();
++depth;
is_leaf = e.leaf;
e.f();
--depth;
is_leaf = false;
if (e.wo)
e.wo->dec();

View file

@ -59,7 +59,7 @@ public:
public:
void inc();
void dec();
void wait(); //! Wait for a set of tasks to finish.
void wait(threadpool *tpool); //! Wait for a set of tasks to finish.
waiter() : num(0){}
~waiter();
};
@ -67,7 +67,7 @@ public:
// Submit a task to the pool. The waiter pointer may be
// NULL if the caller doesn't care to wait for the
// task to finish.
void submit(waiter *waiter, std::function<void()> f);
void submit(waiter *waiter, std::function<void()> f, bool leaf = false);
unsigned int get_max_concurrency() const;
@ -78,6 +78,7 @@ public:
typedef struct entry {
waiter *wo;
std::function<void()> f;
bool leaf;
} entry;
std::deque<entry> queue;
boost::condition_variable has_work;
@ -86,7 +87,7 @@ public:
unsigned int active;
unsigned int max;
bool running;
void run();
void run(bool flush = false);
};
}