attempt to fix the deadlock situations in pqithreadstreamer. Also solves the crash when quitting

git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@8009 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
csoler 2015-03-10 21:16:41 +00:00
parent 4dede651c7
commit b6ce3ab7b2
3 changed files with 81 additions and 46 deletions

View File

@ -365,7 +365,7 @@ int pqiperson::handleNotifyEvent_locked(NetInterface *ni, int newState, const s
{
pqioutput(PQL_WARNING, pqipersonzone, "pqiperson::notifyEvent() Id: " + PeerId().toStdString() + " CONNECT_FAILED->marking so!");
activepqi->stop(); // STOP THREAD.
activepqi->shutdown(); // STOP THREAD.
active = false;
activepqi = NULL;
}
@ -410,7 +410,7 @@ int pqiperson::reset_locked()
std::map<uint32_t, pqiconnect *>::iterator it;
for(it = kids.begin(); it != kids.end(); ++it)
{
(it->second) -> stop(); // STOP THREAD.
(it->second) -> shutdown(); // STOP THREAD.
(it->second) -> reset();
}

View File

@ -36,7 +36,9 @@ pqithreadstreamer::pqithreadstreamer(PQInterface *parent, RsSerialiser *rss, con
{
mTimeout = DEFAULT_STREAMER_TIMEOUT;
mSleepPeriod = DEFAULT_STREAMER_SLEEP;
return;
sem_init(&mShouldStopSemaphore,0,0) ;
sem_init(&mHasStoppedSemaphore,0,1) ;
}
bool pqithreadstreamer::RecvItem(RsItem *item)
@ -54,7 +56,14 @@ int pqithreadstreamer::tick()
void pqithreadstreamer::start()
{
mToRun = true;
// mToRun = true;
std::cerr << "pqithreadstreamer::run()" << std::endl;
std::cerr << " initing should_stop=0" << std::endl;
std::cerr << " initing has_stopped=1" << std::endl;
sem_init(&mShouldStopSemaphore,0,0) ;
sem_init(&mHasStoppedSemaphore,0,0) ;
RsThread::start();
}
@ -64,63 +73,89 @@ void pqithreadstreamer::run()
std::cerr << "pqithreadstream::run()";
std::cerr << std::endl;
{
RsStackMutex stack(mThreadMutex);
mRunning = true;
}
while(1)
{
{
RsStackMutex stack(mThreadMutex);
if (!mToRun)
{
std::cerr << "pqithreadstream::run() stopping";
std::cerr << std::endl;
int sval =0;
sem_getvalue(&mShouldStopSemaphore,&sval) ;
mRunning = false;
return;
if(sval > 0)
{
std::cerr << "pqithreadstreamer::run(): asked to stop." << std::endl;
std::cerr << " setting hasStopped=1" << std::endl;
sem_post(&mHasStoppedSemaphore) ;
return ;
}
// RsStackMutex stack(mThreadMutex);
//
// if (!mToRun)
// {
// std::cerr << "pqithreadstream::run() stopping";
// std::cerr << std::endl;
//
// mRunning = false;
// return;
// }
}
data_tick();
}
}
void pqithreadstreamer::stop()
//bool pqithreadstreamer::threadrunning()
//{
// RsStackMutex stack(mThreadMutex);
// return mRunning ;
//}
void pqithreadstreamer::shutdown()
{
//RsStackMutex stack(mThreadMutex);
std::cerr << "pqithreadstreamer::stop()" << std::endl;
std::cerr << "pqithreadstream::stop()";
std::cerr << std::endl;
int sval =0;
sem_getvalue(&mHasStoppedSemaphore,&sval) ;
mToRun = false;
if(sval > 0)
{
std::cerr << " thread not running. Quit." << std::endl;
return ;
}
std::cerr << " calling stop" << std::endl;
sem_post(&mShouldStopSemaphore) ;
}
void pqithreadstreamer::fullstop()
{
while(1)
{
{
RsStackMutex stack(mThreadMutex);
shutdown() ;
mToRun = false ;
std::cerr << " waiting stop" << std::endl;
sem_wait(&mHasStoppedSemaphore) ;
std::cerr << " finished!" << std::endl;
if (!mRunning)
{
std::cerr << "pqithreadstream::fullstop() complete";
std::cerr << std::endl;
return;
}
}
usleep(1000);
}
// while(1)
// {
// {
// RsStackMutex stack(mThreadMutex);
//
// mToRun = false ;
// // if (!mRunning)
// // {
// // std::cerr << "pqithreadstream::fullstop() complete";
// // std::cerr << std::endl;
// // return;
// // } ;
// }
// usleep(1000);
// }
}
bool pqithreadstreamer::threadrunning()
{
RsStackMutex stack(mThreadMutex);
return mRunning;
}
//bool pqithreadstreamer::threadrunning()
//{
// RsStackMutex stack(mThreadMutex);
// return mRunning;
//}
int pqithreadstreamer::data_tick()

View File

@ -26,6 +26,7 @@
#ifndef MRK_PQI_THREAD_STREAMER_HEADER
#define MRK_PQI_THREAD_STREAMER_HEADER
#include <semaphore.h>
#include "pqi/pqistreamer.h"
#include "util/rsthreads.h"
@ -36,9 +37,9 @@ class pqithreadstreamer: public pqistreamer, public RsThread
virtual void run();
virtual void start();
virtual void stop();
virtual void shutdown();
virtual void fullstop();
virtual bool threadrunning();
//virtual bool threadrunning();
virtual bool RecvItem(RsItem *item);
virtual int tick();
@ -54,9 +55,8 @@ int data_tick();
private:
/* thread variables */
RsMutex mThreadMutex;
bool mRunning;
bool mToRun;
sem_t mShouldStopSemaphore;
sem_t mHasStoppedSemaphore;
};
#endif //MRK_PQI_THREAD_STREAMER_HEADER