From b6ce3ab7b29a8e5b78d52c05804f0ddc2c0575e6 Mon Sep 17 00:00:00 2001 From: csoler Date: Tue, 10 Mar 2015 21:16:41 +0000 Subject: [PATCH] attempt to fix the deadlock situations in pqithreadstreamer. Also solves the crash when quitting git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@8009 b45a01b8-16f6-495d-af2f-9b41ad6348cc --- libretroshare/src/pqi/pqiperson.cc | 4 +- libretroshare/src/pqi/pqithreadstreamer.cc | 113 ++++++++++++++------- libretroshare/src/pqi/pqithreadstreamer.h | 10 +- 3 files changed, 81 insertions(+), 46 deletions(-) diff --git a/libretroshare/src/pqi/pqiperson.cc b/libretroshare/src/pqi/pqiperson.cc index e868e4eeb..c0cc3152c 100644 --- a/libretroshare/src/pqi/pqiperson.cc +++ b/libretroshare/src/pqi/pqiperson.cc @@ -365,7 +365,7 @@ int pqiperson::handleNotifyEvent_locked(NetInterface *ni, int newState, const s { pqioutput(PQL_WARNING, pqipersonzone, "pqiperson::notifyEvent() Id: " + PeerId().toStdString() + " CONNECT_FAILED->marking so!"); - activepqi->stop(); // STOP THREAD. + activepqi->shutdown(); // STOP THREAD. active = false; activepqi = NULL; } @@ -410,7 +410,7 @@ int pqiperson::reset_locked() std::map::iterator it; for(it = kids.begin(); it != kids.end(); ++it) { - (it->second) -> stop(); // STOP THREAD. + (it->second) -> shutdown(); // STOP THREAD. (it->second) -> reset(); } diff --git a/libretroshare/src/pqi/pqithreadstreamer.cc b/libretroshare/src/pqi/pqithreadstreamer.cc index 8f77338c4..5a45348eb 100644 --- a/libretroshare/src/pqi/pqithreadstreamer.cc +++ b/libretroshare/src/pqi/pqithreadstreamer.cc @@ -35,8 +35,10 @@ pqithreadstreamer::pqithreadstreamer(PQInterface *parent, RsSerialiser *rss, con :pqistreamer(rss, id, bio_in, bio_flags_in), mParent(parent), mThreadMutex("pqithreadstreamer"), mTimeout(0) { mTimeout = DEFAULT_STREAMER_TIMEOUT; - mSleepPeriod = DEFAULT_STREAMER_SLEEP; - return; + mSleepPeriod = DEFAULT_STREAMER_SLEEP; + + sem_init(&mShouldStopSemaphore,0,0) ; + sem_init(&mHasStoppedSemaphore,0,1) ; } bool pqithreadstreamer::RecvItem(RsItem *item) @@ -54,9 +56,16 @@ int pqithreadstreamer::tick() void pqithreadstreamer::start() { - mToRun = true; +// mToRun = true; - RsThread::start(); + std::cerr << "pqithreadstreamer::run()" << std::endl; + std::cerr << " initing should_stop=0" << std::endl; + std::cerr << " initing has_stopped=1" << std::endl; + + sem_init(&mShouldStopSemaphore,0,0) ; + sem_init(&mHasStoppedSemaphore,0,0) ; + + RsThread::start(); } void pqithreadstreamer::run() @@ -64,63 +73,89 @@ void pqithreadstreamer::run() std::cerr << "pqithreadstream::run()"; std::cerr << std::endl; - { - RsStackMutex stack(mThreadMutex); - mRunning = true; - } while(1) { { - RsStackMutex stack(mThreadMutex); - if (!mToRun) - { - std::cerr << "pqithreadstream::run() stopping"; - std::cerr << std::endl; + int sval =0; + sem_getvalue(&mShouldStopSemaphore,&sval) ; - mRunning = false; - return; + if(sval > 0) + { + std::cerr << "pqithreadstreamer::run(): asked to stop." << std::endl; + std::cerr << " setting hasStopped=1" << std::endl; + sem_post(&mHasStoppedSemaphore) ; + return ; } + +// RsStackMutex stack(mThreadMutex); +// +// if (!mToRun) +// { +// std::cerr << "pqithreadstream::run() stopping"; +// std::cerr << std::endl; +// +// mRunning = false; +// return; +// } } data_tick(); } } -void pqithreadstreamer::stop() +//bool pqithreadstreamer::threadrunning() +//{ +// RsStackMutex stack(mThreadMutex); +// return mRunning ; +//} +void pqithreadstreamer::shutdown() { - //RsStackMutex stack(mThreadMutex); + std::cerr << "pqithreadstreamer::stop()" << std::endl; - std::cerr << "pqithreadstream::stop()"; - std::cerr << std::endl; + int sval =0; + sem_getvalue(&mHasStoppedSemaphore,&sval) ; - mToRun = false; + if(sval > 0) + { + std::cerr << " thread not running. Quit." << std::endl; + return ; + } + + + std::cerr << " calling stop" << std::endl; + sem_post(&mShouldStopSemaphore) ; } void pqithreadstreamer::fullstop() { - while(1) - { - { - RsStackMutex stack(mThreadMutex); + shutdown() ; - mToRun = false ; + std::cerr << " waiting stop" << std::endl; + sem_wait(&mHasStoppedSemaphore) ; + std::cerr << " finished!" << std::endl; - if (!mRunning) - { - std::cerr << "pqithreadstream::fullstop() complete"; - std::cerr << std::endl; - return; - } - } - usleep(1000); - } +// while(1) +// { +// { +// RsStackMutex stack(mThreadMutex); +// +// mToRun = false ; +// // if (!mRunning) +// // { +// // std::cerr << "pqithreadstream::fullstop() complete"; +// // std::cerr << std::endl; +// // return; +// // } ; +// } +// usleep(1000); +// } } -bool pqithreadstreamer::threadrunning() -{ - RsStackMutex stack(mThreadMutex); - return mRunning; -} +//bool pqithreadstreamer::threadrunning() +//{ +// RsStackMutex stack(mThreadMutex); +// return mRunning; +//} int pqithreadstreamer::data_tick() diff --git a/libretroshare/src/pqi/pqithreadstreamer.h b/libretroshare/src/pqi/pqithreadstreamer.h index dab98f32e..79866d04c 100644 --- a/libretroshare/src/pqi/pqithreadstreamer.h +++ b/libretroshare/src/pqi/pqithreadstreamer.h @@ -26,6 +26,7 @@ #ifndef MRK_PQI_THREAD_STREAMER_HEADER #define MRK_PQI_THREAD_STREAMER_HEADER +#include #include "pqi/pqistreamer.h" #include "util/rsthreads.h" @@ -36,9 +37,9 @@ class pqithreadstreamer: public pqistreamer, public RsThread virtual void run(); virtual void start(); -virtual void stop(); +virtual void shutdown(); virtual void fullstop(); -virtual bool threadrunning(); +//virtual bool threadrunning(); virtual bool RecvItem(RsItem *item); virtual int tick(); @@ -54,9 +55,8 @@ int data_tick(); private: /* thread variables */ RsMutex mThreadMutex; - bool mRunning; - bool mToRun; - + sem_t mShouldStopSemaphore; + sem_t mHasStoppedSemaphore; }; #endif //MRK_PQI_THREAD_STREAMER_HEADER