From 9847a7a45ffb92788f07b130413b6793d4af7653 Mon Sep 17 00:00:00 2001 From: csoler Date: Thu, 5 May 2016 20:52:10 -0400 Subject: [PATCH] implemented a new semaphore class that should be cross plateform --- libretroshare/src/util/rsthreads.cc | 38 +++++++------------ libretroshare/src/util/rsthreads.h | 59 +++++++++++++++++++++++++++-- 2 files changed, 69 insertions(+), 28 deletions(-) diff --git a/libretroshare/src/util/rsthreads.cc b/libretroshare/src/util/rsthreads.cc index d3f87b34a..896ecebd0 100644 --- a/libretroshare/src/util/rsthreads.cc +++ b/libretroshare/src/util/rsthreads.cc @@ -40,6 +40,7 @@ * #define DEBUG_THREADS 1 * #define RSMUTEX_ABORT 1 // Catch wrong pthreads mode. *******/ +#define DEBUG_THREADS 1 #define THREAD_DEBUG std::cerr << "[caller thread ID: " << std::hex << pthread_self() << ", thread ID: " << mTid << std::dec << "] " #ifdef RSMUTEX_ABORT @@ -68,11 +69,8 @@ void *RsThread::rsthread_init(void* p) thread -> runloop(); return NULL; } -RsThread::RsThread () : mMutex("RsThread") +RsThread::RsThread() { - sem_init(&mHasStoppedSemaphore,0,1) ; - sem_init(&mShouldStopSemaphore,0,0) ; - #ifdef WINDOWS_SYS memset (&mTid, 0, sizeof(mTid)); #else @@ -82,8 +80,7 @@ RsThread::RsThread () : mMutex("RsThread") bool RsThread::isRunning() { // do we need a mutex for this ? - int sval =0; - sem_getvalue(&mHasStoppedSemaphore,&sval) ; + int sval = mHasStoppedSemaphore.value() ; #ifdef DEBUG_THREADS THREAD_DEBUG << " isRunning(): returning " << !sval << std::endl; @@ -93,12 +90,7 @@ bool RsThread::isRunning() bool RsThread::shouldStop() { - int sval =0; - sem_getvalue(&mShouldStopSemaphore,&sval) ; - -#ifdef DEBUG_THREADS - THREAD_DEBUG << " shouldStop(): returning " << (sval > 0) << " (sval=" << sval << ") " << std::endl; -#endif + int sval = mShouldStopSemaphore.value() ; return sval > 0; } @@ -108,8 +100,7 @@ void RsTickingThread::shutdown() THREAD_DEBUG << "pqithreadstreamer::shutdown()" << std::endl; #endif - int sval =0; - sem_getvalue(&mHasStoppedSemaphore,&sval) ; + int sval = mHasStoppedSemaphore.value() ; if(sval > 0) { @@ -127,7 +118,7 @@ void RsThread::ask_for_stop() #ifdef DEBUG_THREADS THREAD_DEBUG << " calling stop" << std::endl; #endif - sem_post(&mShouldStopSemaphore) ; + mShouldStopSemaphore.post(); } void RsTickingThread::fullstop() @@ -137,7 +128,7 @@ void RsTickingThread::fullstop() #ifdef DEBUG_THREADS THREAD_DEBUG << " waiting stop" << std::endl; #endif - sem_wait(&mHasStoppedSemaphore) ; + mHasStoppedSemaphore.wait(); #ifdef DEBUG_THREADS THREAD_DEBUG << " finished!" << std::endl; #endif @@ -147,12 +138,11 @@ void RsThread::start() pthread_t tid; void *data = (void *)this ; - RS_STACK_MUTEX(mMutex) ; - #ifdef DEBUG_THREADS THREAD_DEBUG << "pqithreadstreamer::start() initing should_stop=0, has_stopped=1" << std::endl; #endif - sem_init(&mHasStoppedSemaphore,0,0) ; + mHasStoppedSemaphore.set(0) ; + mShouldStopSemaphore.set(0) ; int err ; @@ -164,7 +154,7 @@ void RsThread::start() else { THREAD_DEBUG << "Fatal error: pthread_create could not create a thread. Error returned: " << err << " !!!!!!!" << std::endl; - sem_init(&mHasStoppedSemaphore,0,1) ; + mHasStoppedSemaphore.set(1) ; } } @@ -175,13 +165,11 @@ RsTickingThread::RsTickingThread() #ifdef DEBUG_THREADS THREAD_DEBUG << "RsTickingThread::RsTickingThread()" << std::endl; #endif - sem_init(&mShouldStopSemaphore,0,0) ; } void RsSingleJobThread::runloop() { - sem_init(&mShouldStopSemaphore,0,0) ; - + mShouldStopSemaphore.set(0) ; run() ; } @@ -190,7 +178,7 @@ void RsTickingThread::runloop() #ifdef DEBUG_THREADS THREAD_DEBUG << "pqithreadstream::runloop()" << std::endl; #endif - sem_init(&mShouldStopSemaphore,0,0) ; + mShouldStopSemaphore.set(0) ; while(1) { @@ -199,7 +187,7 @@ void RsTickingThread::runloop() #ifdef DEBUG_THREADS THREAD_DEBUG << "pqithreadstreamer::runloop(): asked to stop. setting hasStopped=1, and returning. Thread ends." << std::endl; #endif - sem_post(&mHasStoppedSemaphore) ; + mHasStoppedSemaphore.post(); return ; } diff --git a/libretroshare/src/util/rsthreads.h b/libretroshare/src/util/rsthreads.h index ffce99891..19d48b66f 100644 --- a/libretroshare/src/util/rsthreads.h +++ b/libretroshare/src/util/rsthreads.h @@ -29,7 +29,9 @@ #include #include #include +#include #include +#include /* RsIface Thread Wrappers */ @@ -168,6 +170,58 @@ class RsStackMutex // #define RS_STACK_MUTEX(m) RsStackMutex __local_retroshare_mutex(m,__PRETTY_FUNCTION__,__FILE__,__LINE__) +// This class handles a Mutex-based semaphore, that makes it cross plateform. +class RsSemaphore +{ + class RsSemStruct + { + public: + RsSemStruct() : mtx("Semaphore mutex"), val(0) {} + + RsMutex mtx ; + uint32_t val ; + }; + +public: + RsSemaphore() + { + s = new RsSemStruct ; + } + + ~RsSemaphore() + { + delete s ; + } + + void set(uint32_t i) + { + RS_STACK_MUTEX(s->mtx) ; + s->val = i ; + } + + void post() + { + RS_STACK_MUTEX(s->mtx) ; + ++(s->val) ; + } + + uint32_t value() + { + RS_STACK_MUTEX(s->mtx) ; + return s->val ; + } + + void wait() + { + while(value() == 0) + usleep(1000) ; + + post() ; + } +private: + RsSemStruct *s ; +}; + class RsThread; /* to create a thread! */ @@ -198,11 +252,10 @@ class RsThread protected: virtual void runloop() =0; /* called once the thread is started. Should be overloaded by subclasses. */ - sem_t mHasStoppedSemaphore; - sem_t mShouldStopSemaphore; + RsSemaphore mHasStoppedSemaphore; + RsSemaphore mShouldStopSemaphore; static void *rsthread_init(void*) ; - RsMutex mMutex; pthread_t mTid; };