implemented a new semaphore class that should be cross plateform

This commit is contained in:
csoler 2016-05-05 20:52:10 -04:00
parent 092345c3fb
commit 9847a7a45f
2 changed files with 69 additions and 28 deletions

View File

@ -40,6 +40,7 @@
* #define DEBUG_THREADS 1 * #define DEBUG_THREADS 1
* #define RSMUTEX_ABORT 1 // Catch wrong pthreads mode. * #define RSMUTEX_ABORT 1 // Catch wrong pthreads mode.
*******/ *******/
#define DEBUG_THREADS 1
#define THREAD_DEBUG std::cerr << "[caller thread ID: " << std::hex << pthread_self() << ", thread ID: " << mTid << std::dec << "] " #define THREAD_DEBUG std::cerr << "[caller thread ID: " << std::hex << pthread_self() << ", thread ID: " << mTid << std::dec << "] "
#ifdef RSMUTEX_ABORT #ifdef RSMUTEX_ABORT
@ -68,11 +69,8 @@ void *RsThread::rsthread_init(void* p)
thread -> runloop(); thread -> runloop();
return NULL; return NULL;
} }
RsThread::RsThread () : mMutex("RsThread") RsThread::RsThread()
{ {
sem_init(&mHasStoppedSemaphore,0,1) ;
sem_init(&mShouldStopSemaphore,0,0) ;
#ifdef WINDOWS_SYS #ifdef WINDOWS_SYS
memset (&mTid, 0, sizeof(mTid)); memset (&mTid, 0, sizeof(mTid));
#else #else
@ -82,8 +80,7 @@ RsThread::RsThread () : mMutex("RsThread")
bool RsThread::isRunning() bool RsThread::isRunning()
{ {
// do we need a mutex for this ? // do we need a mutex for this ?
int sval =0; int sval = mHasStoppedSemaphore.value() ;
sem_getvalue(&mHasStoppedSemaphore,&sval) ;
#ifdef DEBUG_THREADS #ifdef DEBUG_THREADS
THREAD_DEBUG << " isRunning(): returning " << !sval << std::endl; THREAD_DEBUG << " isRunning(): returning " << !sval << std::endl;
@ -93,12 +90,7 @@ bool RsThread::isRunning()
bool RsThread::shouldStop() bool RsThread::shouldStop()
{ {
int sval =0; int sval = mShouldStopSemaphore.value() ;
sem_getvalue(&mShouldStopSemaphore,&sval) ;
#ifdef DEBUG_THREADS
THREAD_DEBUG << " shouldStop(): returning " << (sval > 0) << " (sval=" << sval << ") " << std::endl;
#endif
return sval > 0; return sval > 0;
} }
@ -108,8 +100,7 @@ void RsTickingThread::shutdown()
THREAD_DEBUG << "pqithreadstreamer::shutdown()" << std::endl; THREAD_DEBUG << "pqithreadstreamer::shutdown()" << std::endl;
#endif #endif
int sval =0; int sval = mHasStoppedSemaphore.value() ;
sem_getvalue(&mHasStoppedSemaphore,&sval) ;
if(sval > 0) if(sval > 0)
{ {
@ -127,7 +118,7 @@ void RsThread::ask_for_stop()
#ifdef DEBUG_THREADS #ifdef DEBUG_THREADS
THREAD_DEBUG << " calling stop" << std::endl; THREAD_DEBUG << " calling stop" << std::endl;
#endif #endif
sem_post(&mShouldStopSemaphore) ; mShouldStopSemaphore.post();
} }
void RsTickingThread::fullstop() void RsTickingThread::fullstop()
@ -137,7 +128,7 @@ void RsTickingThread::fullstop()
#ifdef DEBUG_THREADS #ifdef DEBUG_THREADS
THREAD_DEBUG << " waiting stop" << std::endl; THREAD_DEBUG << " waiting stop" << std::endl;
#endif #endif
sem_wait(&mHasStoppedSemaphore) ; mHasStoppedSemaphore.wait();
#ifdef DEBUG_THREADS #ifdef DEBUG_THREADS
THREAD_DEBUG << " finished!" << std::endl; THREAD_DEBUG << " finished!" << std::endl;
#endif #endif
@ -147,12 +138,11 @@ void RsThread::start()
pthread_t tid; pthread_t tid;
void *data = (void *)this ; void *data = (void *)this ;
RS_STACK_MUTEX(mMutex) ;
#ifdef DEBUG_THREADS #ifdef DEBUG_THREADS
THREAD_DEBUG << "pqithreadstreamer::start() initing should_stop=0, has_stopped=1" << std::endl; THREAD_DEBUG << "pqithreadstreamer::start() initing should_stop=0, has_stopped=1" << std::endl;
#endif #endif
sem_init(&mHasStoppedSemaphore,0,0) ; mHasStoppedSemaphore.set(0) ;
mShouldStopSemaphore.set(0) ;
int err ; int err ;
@ -164,7 +154,7 @@ void RsThread::start()
else else
{ {
THREAD_DEBUG << "Fatal error: pthread_create could not create a thread. Error returned: " << err << " !!!!!!!" << std::endl; THREAD_DEBUG << "Fatal error: pthread_create could not create a thread. Error returned: " << err << " !!!!!!!" << std::endl;
sem_init(&mHasStoppedSemaphore,0,1) ; mHasStoppedSemaphore.set(1) ;
} }
} }
@ -175,13 +165,11 @@ RsTickingThread::RsTickingThread()
#ifdef DEBUG_THREADS #ifdef DEBUG_THREADS
THREAD_DEBUG << "RsTickingThread::RsTickingThread()" << std::endl; THREAD_DEBUG << "RsTickingThread::RsTickingThread()" << std::endl;
#endif #endif
sem_init(&mShouldStopSemaphore,0,0) ;
} }
void RsSingleJobThread::runloop() void RsSingleJobThread::runloop()
{ {
sem_init(&mShouldStopSemaphore,0,0) ; mShouldStopSemaphore.set(0) ;
run() ; run() ;
} }
@ -190,7 +178,7 @@ void RsTickingThread::runloop()
#ifdef DEBUG_THREADS #ifdef DEBUG_THREADS
THREAD_DEBUG << "pqithreadstream::runloop()" << std::endl; THREAD_DEBUG << "pqithreadstream::runloop()" << std::endl;
#endif #endif
sem_init(&mShouldStopSemaphore,0,0) ; mShouldStopSemaphore.set(0) ;
while(1) while(1)
{ {
@ -199,7 +187,7 @@ void RsTickingThread::runloop()
#ifdef DEBUG_THREADS #ifdef DEBUG_THREADS
THREAD_DEBUG << "pqithreadstreamer::runloop(): asked to stop. setting hasStopped=1, and returning. Thread ends." << std::endl; THREAD_DEBUG << "pqithreadstreamer::runloop(): asked to stop. setting hasStopped=1, and returning. Thread ends." << std::endl;
#endif #endif
sem_post(&mHasStoppedSemaphore) ; mHasStoppedSemaphore.post();
return ; return ;
} }

View File

@ -29,7 +29,9 @@
#include <inttypes.h> #include <inttypes.h>
#include <string> #include <string>
#include <iostream> #include <iostream>
#include <unistd.h>
#include <semaphore.h> #include <semaphore.h>
#include <util/rsmemory.h>
/* RsIface Thread Wrappers */ /* RsIface Thread Wrappers */
@ -168,6 +170,58 @@ class RsStackMutex
// //
#define RS_STACK_MUTEX(m) RsStackMutex __local_retroshare_mutex(m,__PRETTY_FUNCTION__,__FILE__,__LINE__) #define RS_STACK_MUTEX(m) RsStackMutex __local_retroshare_mutex(m,__PRETTY_FUNCTION__,__FILE__,__LINE__)
// This class handles a Mutex-based semaphore, that makes it cross plateform.
class RsSemaphore
{
class RsSemStruct
{
public:
RsSemStruct() : mtx("Semaphore mutex"), val(0) {}
RsMutex mtx ;
uint32_t val ;
};
public:
RsSemaphore()
{
s = new RsSemStruct ;
}
~RsSemaphore()
{
delete s ;
}
void set(uint32_t i)
{
RS_STACK_MUTEX(s->mtx) ;
s->val = i ;
}
void post()
{
RS_STACK_MUTEX(s->mtx) ;
++(s->val) ;
}
uint32_t value()
{
RS_STACK_MUTEX(s->mtx) ;
return s->val ;
}
void wait()
{
while(value() == 0)
usleep(1000) ;
post() ;
}
private:
RsSemStruct *s ;
};
class RsThread; class RsThread;
/* to create a thread! */ /* to create a thread! */
@ -198,11 +252,10 @@ class RsThread
protected: protected:
virtual void runloop() =0; /* called once the thread is started. Should be overloaded by subclasses. */ virtual void runloop() =0; /* called once the thread is started. Should be overloaded by subclasses. */
sem_t mHasStoppedSemaphore; RsSemaphore mHasStoppedSemaphore;
sem_t mShouldStopSemaphore; RsSemaphore mShouldStopSemaphore;
static void *rsthread_init(void*) ; static void *rsthread_init(void*) ;
RsMutex mMutex;
pthread_t mTid; pthread_t mTid;
}; };