fixed a few bugs in checking for unused groups

This commit is contained in:
csoler 2020-11-25 23:03:25 +01:00
parent 2c7ee7ebeb
commit f21b57b643
6 changed files with 123 additions and 104 deletions

View File

@ -148,7 +148,6 @@ RsGenExchange::RsGenExchange(
mAuthenPolicy(authenPolicy),
mCleaning(false),
mLastClean((int)time(NULL) - (int)(RSRandom::random_u32() % MSG_CLEANUP_PERIOD)), // this helps unsynchronising the checks for the different services
mMsgCleanUp(NULL),
mChecking(false),
mCheckStarted(false),
mLastCheck((int)time(NULL) - (int)(RSRandom::random_u32() % INTEGRITY_CHECK_PERIOD) + 120), // this helps unsynchronising the checks for the different services, with 2 min security to avoid checking right away before statistics come up.
@ -260,30 +259,16 @@ void RsGenExchange::tick()
// Cleanup unused data. This is only needed when auto-synchronization is needed, which is not the case
// of identities. This is why idendities do their own cleaning.
now = time(NULL);
if((mNetService->msgAutoSync() || mNetService->grpAutoSync())
&& ((mLastClean + MSG_CLEANUP_PERIOD < now) || mCleaning))
if( (mNetService && (mNetService->msgAutoSync() || mNetService->grpAutoSync())) && (mLastClean + MSG_CLEANUP_PERIOD < now) )
{
if(mMsgCleanUp)
{
RS_STACK_MUTEX(mGenMtx);
RsGxsCleanUp(mDataStore,this,1).clean(mNextGroupToCheck); // no need to lock here, because all access below (RsGenExchange, RsDataStore) are properly mutexed
if(mMsgCleanUp->clean())
{
mCleaning = false;
delete mMsgCleanUp;
mMsgCleanUp = NULL;
mLastClean = time(NULL);
}
}
else
{
mMsgCleanUp = new RsGxsCleanUp(mDataStore, this, 1);
mCleaning = true;
}
}
RS_STACK_MUTEX(mGenMtx) ;
mLastClean = now;
}
now = time(NULL);
if(mChecking || (mLastCheck + INTEGRITY_CHECK_PERIOD < now))
{
mLastCheck = time(NULL);

View File

@ -972,12 +972,11 @@ private:
bool mCleaning;
rstime_t mLastClean;
RsGxsCleanUp* mMsgCleanUp;
bool mChecking, mCheckStarted;
rstime_t mLastCheck;
RsGxsIntegrityCheck* mIntegrityCheck;
RsGxsGroupId mNextGroupToCheck ;
protected:
enum CreateStatus { CREATE_FAIL, CREATE_SUCCESS, CREATE_FAIL_TRY_LATER };

View File

@ -50,41 +50,53 @@ static const uint32_t MAX_GXS_IDS_REQUESTS_NET = 10 ; // max number of reques
RsGxsCleanUp::RsGxsCleanUp(RsGeneralDataService* const dataService, RsGenExchange *genex, uint32_t chunkSize)
: mDs(dataService), mGenExchangeClient(genex), CHUNK_SIZE(chunkSize)
{
RsGxsGrpMetaTemporaryMap grpMeta;
mDs->retrieveGxsGrpMetaData(grpMeta);
for(auto cit=grpMeta.begin();cit != grpMeta.end(); ++cit)
mGrpMeta.push_back(cit->second);
}
bool RsGxsCleanUp::clean()
bool RsGxsCleanUp::clean(RsGxsGroupId& next_group_to_check)
{
uint32_t i = 1;
RsGxsGrpMetaTemporaryMap grpMetaMap;
mDs->retrieveGxsGrpMetaData(grpMetaMap);
rstime_t now = time(NULL);
std::vector<RsGxsGroupId> grps_to_delete;
#ifdef DEBUG_GXSUTIL
uint16_t service_type = mGenExchangeClient->serviceType() ;
GXSUTIL_DEBUG() << " Cleaning up groups in service " << std::hex << service_type << std::dec << std::endl;
GXSUTIL_DEBUG() << " Cleaning up groups in service " << std::hex << service_type << std::dec << " starting at group " << next_group_to_check << std::endl;
#endif
while(!mGrpMeta.empty())
// This method stores/takes the next group to check. This allows to limit group checking to a small part of the total groups
// in the situation where it takes too much time. So when arriving here, we must start again from where we left last time.
if(grpMetaMap.empty()) // nothing to do.
{
const RsGxsGrpMetaData* grpMeta = mGrpMeta.back();
next_group_to_check.clear();
return true;
}
auto it = next_group_to_check.isNull()?grpMetaMap.begin() : grpMetaMap.find(next_group_to_check);
if(it == grpMetaMap.end()) // group wasn't found
it = grpMetaMap.begin();
bool full_round = false; // did we have the time to test all groups?
next_group_to_check = it->first; // covers the case where next_group_to_check is null or not found
while(true) // check all groups, starting from the one indicated as parameter
{
const RsGxsGrpMetaData& grpMeta = *(it->second);
// first check if we keep the group or not
if(!mGenExchangeClient->service_checkIfGroupIsStillUsed(*grpMeta))
if(!mGenExchangeClient->service_checkIfGroupIsStillUsed(grpMeta))
{
#ifdef DEBUG_GXSUTIL
std::cerr << " Scheduling group " << grpMeta->mGroupId << " for removal." << std::endl;
std::cerr << " Scheduling group " << grpMeta.mGroupId << " for removal." << std::endl;
#endif
grps_to_delete.push_back(grpMeta->mGroupId);
grps_to_delete.push_back(grpMeta.mGroupId);
}
else
{
const RsGxsGroupId& grpId = grpMeta->mGroupId;
mGrpMeta.pop_back();
const RsGxsGroupId& grpId = grpMeta.mGroupId;
GxsMsgReq req;
GxsMsgMetaResult result;
@ -127,12 +139,12 @@ bool RsGxsCleanUp::clean()
remove &= !(meta->mMsgStatus & GXS_SERV::GXS_MSG_STATUS_KEEP_FOREVER);
// if not subscribed remove messages (can optimise this really)
remove = remove || (grpMeta->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_NOT_SUBSCRIBED);
remove = remove || !(grpMeta->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED);
remove = remove || (grpMeta.mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_NOT_SUBSCRIBED);
remove = remove || !(grpMeta.mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED);
#ifdef DEBUG_GXSUTIL
GXSUTIL_DEBUG() << " msg id " << meta->mMsgId << " in grp " << grpId << ": keep_flag=" << bool(meta->mMsgStatus & GXS_SERV::GXS_MSG_STATUS_KEEP_FOREVER)
<< " subscribed: " << bool(grpMeta->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED) << " store_period: " << store_period
<< " subscribed: " << bool(grpMeta.mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED) << " store_period: " << store_period
<< " kids: " << have_kids << " now - meta->mPublishTs: " << now - meta->mPublishTs ;
#endif
@ -153,15 +165,39 @@ bool RsGxsCleanUp::clean()
}
mDs->removeMsgs(messages_to_delete);
}
i++;
if(i > CHUNK_SIZE) break;
++it;
if(it == grpMetaMap.end())
it = grpMetaMap.begin();
// check if we looped already
if(it->first == next_group_to_check)
{
GXSUTIL_DEBUG() << "Had the time to test all groups. Will start again at " << it->first << std::endl;
full_round = true;
break;
}
// now check if we spent too much time on this already
rstime_t tm = time(nullptr);
//if(tm > now + 1) // we spent more than 1 sec on the job already
if(tm > now) // we spent more than 1 sec on the job already
{
GXSUTIL_DEBUG() << "Aborting cleanup because it took too much time already. Next group left to be " << it->first << std::endl;
next_group_to_check = it->first;
full_round = false;
break;
}
}
//mDs->removeGroups(grps_to_delete);
return mGrpMeta.empty();
return full_round;
}
RsGxsIntegrityCheck::RsGxsIntegrityCheck(

View File

@ -36,10 +36,10 @@ class RsGeneralDataService ;
class non_copiable
{
public:
non_copiable() {}
non_copiable() {}
private:
non_copiable& operator=(const non_copiable&) { return *this ;}
non_copiable(const non_copiable&) {}
non_copiable& operator=(const non_copiable&) { return *this ;}
non_copiable(const non_copiable&) {}
};
template<class IdClass,class IdData>
@ -55,7 +55,7 @@ public:
{
for(typename t_RsGxsGenericDataTemporaryMap<IdClass,IdData>::iterator it = this->begin();it!=this->end();++it)
if(it->second != NULL)
delete it->second ;
delete it->second ;
std::map<IdClass,IdData*>::clear() ;
}
@ -75,7 +75,7 @@ public:
for(typename t_RsGxsGenericDataTemporaryMapVector<T>::iterator it = this->begin();it!=this->end();++it)
{
for(uint32_t i=0;i<it->second.size();++i)
delete it->second[i] ;
delete it->second[i] ;
it->second.clear();
}
@ -113,12 +113,12 @@ typedef t_RsGxsGenericDataTemporaryList<RsNxsMsg> RsNxsMsgDa
inline RsGxsGrpMsgIdPair getMsgIdPair(RsNxsMsg& msg)
{
return RsGxsGrpMsgIdPair(std::make_pair(msg.grpId, msg.msgId));
return RsGxsGrpMsgIdPair(std::make_pair(msg.grpId, msg.msgId));
}
inline RsGxsGrpMsgIdPair getMsgIdPair(RsGxsMsgItem& msg)
{
return RsGxsGrpMsgIdPair(std::make_pair(msg.meta.mGroupId, msg.meta.mMsgId));
return RsGxsGrpMsgIdPair(std::make_pair(msg.meta.mGroupId, msg.meta.mMsgId));
}
/*!
@ -129,28 +129,27 @@ class RsGxsCleanUp
{
public:
/*!
*
* @param dataService
* @param mGroupTS
* @param chunkSize
* @param sleepPeriod
*/
/*!
*
* @param dataService
* @param mGroupTS
* @param chunkSize
* @param sleepPeriod
*/
RsGxsCleanUp(RsGeneralDataService* const dataService, RsGenExchange *genex, uint32_t chunkSize);
/*!
* On construction this should be called to progress deletions
* Deletion will process by chunk size
* @return true if no more messages to delete, false otherwise
*/
bool clean();
/*!
* On construction this should be called to progress deletions
* Deletion will process by chunk size
* @return true if no more messages to delete, false otherwise
*/
bool clean(RsGxsGroupId &last_checked_group);
private:
RsGeneralDataService* const mDs;
RsGeneralDataService* const mDs;
RsGenExchange *mGenExchangeClient;
uint32_t CHUNK_SIZE;
std::vector<const RsGxsGrpMetaData*> mGrpMeta;
uint32_t CHUNK_SIZE;
};
/*!
@ -160,52 +159,52 @@ private:
class RsGxsIntegrityCheck : public RsThread
{
enum CheckState { CheckStart, CheckChecking };
enum CheckState { CheckStart, CheckChecking };
public:
/*!
*
* @param dataService
* @param mGroupTS
* @param chunkSize
* @param sleepPeriod
*/
RsGxsIntegrityCheck( RsGeneralDataService* const dataService,
RsGenExchange *genex, RsSerialType& gxsSerialiser,
RsGixs *gixs);
/*!
*
* @param dataService
* @param mGroupTS
* @param chunkSize
* @param sleepPeriod
*/
RsGxsIntegrityCheck( RsGeneralDataService* const dataService,
RsGenExchange *genex, RsSerialType& gxsSerialiser,
RsGixs *gixs);
bool check();
bool isDone();
bool check();
bool isDone();
void run();
void run();
void getDeletedIds(std::list<RsGxsGroupId>& grpIds, std::map<RsGxsGroupId, std::set<RsGxsMessageId> > &msgIds);
void getDeletedIds(std::list<RsGxsGroupId>& grpIds, std::map<RsGxsGroupId, std::set<RsGxsMessageId> > &msgIds);
private:
RsGeneralDataService* const mDs;
RsGenExchange *mGenExchangeClient;
RsGeneralDataService* const mDs;
RsGenExchange *mGenExchangeClient;
#ifdef RS_DEEP_CHANNEL_INDEX
RsSerialType& mSerializer;
RsSerialType& mSerializer;
#endif
bool mDone;
RsMutex mIntegrityMutex;
std::list<RsGxsGroupId> mDeletedGrps;
std::map<RsGxsGroupId, std::set<RsGxsMessageId> > mDeletedMsgs;
bool mDone;
RsMutex mIntegrityMutex;
std::list<RsGxsGroupId> mDeletedGrps;
std::map<RsGxsGroupId, std::set<RsGxsMessageId> > mDeletedMsgs;
RsGixs* mGixs;
RsGixs* mGixs;
};
class GroupUpdate
{
public:
GroupUpdate() : oldGrpMeta(NULL), newGrp(NULL), validUpdate(false)
{}
const RsGxsGrpMetaData* oldGrpMeta;
RsNxsGrp* newGrp;
bool validUpdate;
GroupUpdate() : oldGrpMeta(NULL), newGrp(NULL), validUpdate(false)
{}
const RsGxsGrpMetaData* oldGrpMeta;
RsNxsGrp* newGrp;
bool validUpdate;
};
class GroupUpdatePublish
@ -213,8 +212,8 @@ class GroupUpdatePublish
public:
GroupUpdatePublish(RsGxsGrpItem* item, uint32_t token)
: grpItem(item), mToken(token) {}
RsGxsGrpItem* grpItem;
uint32_t mToken;
RsGxsGrpItem* grpItem;
uint32_t mToken;
};
class GroupDeletePublish
@ -222,8 +221,8 @@ class GroupDeletePublish
public:
GroupDeletePublish(const RsGxsGroupId& grpId, uint32_t token)
: mGroupId(grpId), mToken(token) {}
RsGxsGroupId mGroupId;
uint32_t mToken;
RsGxsGroupId mGroupId;
uint32_t mToken;
};
@ -232,7 +231,7 @@ class MsgDeletePublish
public:
MsgDeletePublish(const GxsMsgReq& msgs, uint32_t token)
: mMsgs(msgs), mToken(token) {}
GxsMsgReq mMsgs ;
uint32_t mToken;
GxsMsgReq mMsgs ;
uint32_t mToken;
};

View File

@ -1724,7 +1724,7 @@ void p3GxsCircles::addCircleIdToList(const RsGxsCircleId &circleId, uint32_t cir
bool p3GxsCircles::service_checkIfGroupIsStillUsed(const RsGxsGrpMetaData& meta)
{
std::cerr << "p3gxsChannels: Checking unused board: called by GxsCleaning." << std::endl;
std::cerr << "p3gxsChannels: Checking unused circles: called by GxsCleaning." << std::endl;
// request all group infos at once

View File

@ -48,7 +48,7 @@ RsGxsForums *rsGxsForums = NULL;
#define FORUM_TESTEVENT_DUMMYDATA 0x0001
#define DUMMYDATA_PERIOD 60 // long enough for some RsIdentities to be generated.
#define FORUM_UNUSED_BY_FRIENDS_DELAY (2*30*86400) // unused forums are deleted after 2 months
#define FORUM_UNUSED_BY_FRIENDS_DELAY (2*5*86400) // unused forums are deleted after 2 months
/********************************************************************************/
/******************* Startup / Tick ******************************************/