fixed thread issue in RsGxsCleanupThread that caused random crashes

This commit is contained in:
csoler 2017-06-12 20:36:02 +02:00
parent 1766087f71
commit 79825eb2e2
3 changed files with 88 additions and 81 deletions

View file

@ -190,52 +190,54 @@ void RsGenExchange::tick()
now = time(NULL); now = time(NULL);
if(mChecking || (mLastCheck + INTEGRITY_CHECK_PERIOD < now)) if(mChecking || (mLastCheck + INTEGRITY_CHECK_PERIOD < now))
{ {
if(mIntegrityCheck) mLastCheck = time(NULL);
{ {
if(mIntegrityCheck->isDone()) RS_STACK_MUTEX(mGenMtx) ;
if(!mIntegrityCheck)
{ {
std::list<RsGxsGroupId> grpIds; mIntegrityCheck = new RsGxsIntegrityCheck(mDataStore,this,mGixs);
std::map<RsGxsGroupId, std::vector<RsGxsMessageId> > msgIds; mIntegrityCheck->start("gxs integrity");
mIntegrityCheck->getDeletedIds(grpIds, msgIds); mChecking = true;
if (!grpIds.empty())
{
RS_STACK_MUTEX(mGenMtx) ;
RsGxsGroupChange* gc = new RsGxsGroupChange(RsGxsNotify::TYPE_PROCESSED, false);
gc->mGrpIdList = grpIds;
#ifdef GEN_EXCH_DEBUG
std::cerr << " adding the following grp ids to notification: " << std::endl;
for(std::list<RsGxsGroupId>::const_iterator it(grpIds.begin());it!=grpIds.end();++it)
std::cerr << " " << *it << std::endl;
#endif
mNotifications.push_back(gc);
// also notify the network exchange service that these groups no longer exist.
if(mNetService)
mNetService->removeGroups(grpIds) ;
}
if (!msgIds.empty()) {
RS_STACK_MUTEX(mGenMtx) ;
RsGxsMsgChange* c = new RsGxsMsgChange(RsGxsNotify::TYPE_PROCESSED, false);
c->msgChangeMap = msgIds;
mNotifications.push_back(c);
}
delete mIntegrityCheck;
mIntegrityCheck = NULL;
mLastCheck = time(NULL);
mChecking = false;
} }
} }
else
if(mIntegrityCheck->isDone())
{ {
mIntegrityCheck = new RsGxsIntegrityCheck(mDataStore,this,mGixs); RS_STACK_MUTEX(mGenMtx) ;
mIntegrityCheck->start("gxs integrity");
mChecking = true; std::list<RsGxsGroupId> grpIds;
std::map<RsGxsGroupId, std::vector<RsGxsMessageId> > msgIds;
mIntegrityCheck->getDeletedIds(grpIds, msgIds);
if (!grpIds.empty())
{
RsGxsGroupChange* gc = new RsGxsGroupChange(RsGxsNotify::TYPE_PROCESSED, false);
gc->mGrpIdList = grpIds;
#ifdef GEN_EXCH_DEBUG
std::cerr << " adding the following grp ids to notification: " << std::endl;
for(std::list<RsGxsGroupId>::const_iterator it(grpIds.begin());it!=grpIds.end();++it)
std::cerr << " " << *it << std::endl;
#endif
mNotifications.push_back(gc);
// also notify the network exchange service that these groups no longer exist.
if(mNetService)
mNetService->removeGroups(grpIds) ;
}
if (!msgIds.empty())
{
RsGxsMsgChange* c = new RsGxsMsgChange(RsGxsNotify::TYPE_PROCESSED, false);
c->msgChangeMap = msgIds;
mNotifications.push_back(c);
}
delete mIntegrityCheck;
mIntegrityCheck = NULL;
mChecking = false;
} }
} }
} }

View file

@ -136,6 +136,9 @@ RsGxsIntegrityCheck::RsGxsIntegrityCheck(RsGeneralDataService* const dataService
void RsGxsIntegrityCheck::run() void RsGxsIntegrityCheck::run()
{ {
check(); check();
RsStackMutex stack(mIntegrityMutex);
mDone = true;
} }
bool RsGxsIntegrityCheck::check() bool RsGxsIntegrityCheck::check()
@ -286,71 +289,72 @@ bool RsGxsIntegrityCheck::check()
mDs->removeMsgs(msgsToDel); mDs->removeMsgs(msgsToDel);
RsStackMutex stack(mIntegrityMutex); {
mDone = true; RsStackMutex stack(mIntegrityMutex);
std::vector<RsGxsGroupId>::iterator grpIt; std::vector<RsGxsGroupId>::iterator grpIt;
for(grpIt = grpsToDel.begin(); grpIt != grpsToDel.end(); ++grpIt) for(grpIt = grpsToDel.begin(); grpIt != grpsToDel.end(); ++grpIt)
{ {
mDeletedGrps.push_back(*grpIt); mDeletedGrps.push_back(*grpIt);
} }
mDeletedMsgs = msgsToDel; mDeletedMsgs = msgsToDel;
#ifdef DEBUG_GXSUTIL #ifdef DEBUG_GXSUTIL
GXSUTIL_DEBUG() << "At end of pass, this is the list used GXS ids: " << std::endl; GXSUTIL_DEBUG() << "At end of pass, this is the list used GXS ids: " << std::endl;
GXSUTIL_DEBUG() << " requesting them to GXS identity service to enforce loading." << std::endl; GXSUTIL_DEBUG() << " requesting them to GXS identity service to enforce loading." << std::endl;
#endif #endif
std::list<RsPeerId> connected_friends ; std::list<RsPeerId> connected_friends ;
rsPeers->getOnlineList(connected_friends) ; rsPeers->getOnlineList(connected_friends) ;
std::vector<std::pair<RsGxsId,RsIdentityUsage> > gxs_ids ; std::vector<std::pair<RsGxsId,RsIdentityUsage> > gxs_ids ;
for(std::map<RsGxsId,RsIdentityUsage>::const_iterator it(used_gxs_ids.begin());it!=used_gxs_ids.end();++it) for(std::map<RsGxsId,RsIdentityUsage>::const_iterator it(used_gxs_ids.begin());it!=used_gxs_ids.end();++it)
{ {
gxs_ids.push_back(*it) ; gxs_ids.push_back(*it) ;
#ifdef DEBUG_GXSUTIL #ifdef DEBUG_GXSUTIL
GXSUTIL_DEBUG() << " " << *it << std::endl; GXSUTIL_DEBUG() << " " << *it << std::endl;
#endif #endif
} }
uint32_t nb_requested_not_in_cache = 0; uint32_t nb_requested_not_in_cache = 0;
#ifdef DEBUG_GXSUTIL #ifdef DEBUG_GXSUTIL
GXSUTIL_DEBUG() << " issuing random get on friends for non existing IDs" << std::endl; GXSUTIL_DEBUG() << " issuing random get on friends for non existing IDs" << std::endl;
#endif #endif
// now request a cache update for them, which triggers downloading from friends, if missing. // now request a cache update for them, which triggers downloading from friends, if missing.
for(;nb_requested_not_in_cache<MAX_GXS_IDS_REQUESTS_NET && !gxs_ids.empty();) for(;nb_requested_not_in_cache<MAX_GXS_IDS_REQUESTS_NET && !gxs_ids.empty();)
{ {
uint32_t n = RSRandom::random_u32() % gxs_ids.size() ; uint32_t n = RSRandom::random_u32() % gxs_ids.size() ;
#ifdef DEBUG_GXSUTIL #ifdef DEBUG_GXSUTIL
GXSUTIL_DEBUG() << " requesting ID " << gxs_ids[n] ; GXSUTIL_DEBUG() << " requesting ID " << gxs_ids[n] ;
#endif #endif
if(!mGixs->haveKey(gxs_ids[n].first)) // checks if we have it already in the cache (conservative way to ensure that we atually have it) if(!mGixs->haveKey(gxs_ids[n].first)) // checks if we have it already in the cache (conservative way to ensure that we atually have it)
{ {
mGixs->requestKey(gxs_ids[n].first,connected_friends,gxs_ids[n].second); mGixs->requestKey(gxs_ids[n].first,connected_friends,gxs_ids[n].second);
++nb_requested_not_in_cache ; ++nb_requested_not_in_cache ;
#ifdef DEBUG_GXSUTIL #ifdef DEBUG_GXSUTIL
GXSUTIL_DEBUG() << " ... from cache/net" << std::endl; GXSUTIL_DEBUG() << " ... from cache/net" << std::endl;
#endif #endif
} }
else else
{ {
#ifdef DEBUG_GXSUTIL #ifdef DEBUG_GXSUTIL
GXSUTIL_DEBUG() << " ... already in cache" << std::endl; GXSUTIL_DEBUG() << " ... already in cache" << std::endl;
#endif #endif
} }
mGixs->timeStampKey(gxs_ids[n].first,gxs_ids[n].second); mGixs->timeStampKey(gxs_ids[n].first,gxs_ids[n].second);
gxs_ids[n] = gxs_ids[gxs_ids.size()-1] ; gxs_ids[n] = gxs_ids[gxs_ids.size()-1] ;
gxs_ids.pop_back() ; gxs_ids.pop_back() ;
} }
#ifdef DEBUG_GXSUTIL #ifdef DEBUG_GXSUTIL
GXSUTIL_DEBUG() << " total actual cache requests: "<< nb_requested_not_in_cache << std::endl; GXSUTIL_DEBUG() << " total actual cache requests: "<< nb_requested_not_in_cache << std::endl;
#endif #endif
}
return true; return true;
} }

View file

@ -179,6 +179,7 @@ void RsThread::start(const std::string &threadName)
THREAD_DEBUG << "pqithreadstreamer::start() initing should_stop=0" << std::endl; THREAD_DEBUG << "pqithreadstreamer::start() initing should_stop=0" << std::endl;
#endif #endif
mShouldStopSemaphore.set(0) ; mShouldStopSemaphore.set(0) ;
mHasStoppedSemaphore.set(0) ;
int err ; int err ;