simplified GxsIntegrityCheck, in hope that it will remove the crashes. Also moved the deletion to async RsGenExchange, so as to get the proper notifications

This commit is contained in:
csoler 2020-12-01 20:34:23 +01:00
parent a4e3f98a09
commit fa20b9b254
3 changed files with 34 additions and 73 deletions

View File

@ -301,35 +301,22 @@ void RsGenExchange::tick()
{
RS_STACK_MUTEX(mGenMtx) ;
std::list<RsGxsGroupId> grpIds;
std::map<RsGxsGroupId, std::set<RsGxsMessageId> > msgIds;
std::vector<RsGxsGroupId> grpIds;
GxsMsgReq msgIds;
mIntegrityCheck->getDeletedIds(grpIds, msgIds);
if (!grpIds.empty())
if(!msgIds.empty())
{
for(auto& groupId:grpIds)
{
RsGxsGroupChange* gc = new RsGxsGroupChange(RsGxsNotify::TYPE_GROUP_DELETED,groupId, false);
#ifdef GEN_EXCH_DEBUG
std::cerr << " adding the following grp ids to notification: " << std::endl;
for(std::list<RsGxsGroupId>::const_iterator it(grpIds.begin());it!=grpIds.end();++it)
std::cerr << " " << *it << std::endl;
#endif
mNotifications.push_back(gc);
uint32_t token1=0;
deleteMsgs(token1,msgIds);
}
// also notify the network exchange service that these groups no longer exist.
if(mNetService)
mNetService->removeGroups(grpIds) ;
}
for(auto it(msgIds.begin());it!=msgIds.end();++it)
for(auto& msgId:it->second)
if(!grpIds.empty())
for(auto& grpId: grpIds)
{
RsGxsMsgChange* c = new RsGxsMsgChange(RsGxsNotify::TYPE_MESSAGE_DELETED,it->first, msgId, false);
mNotifications.push_back(c);
uint32_t token2=0;
deleteGroup(token2,grpId);
}
delete mIntegrityCheck;

View File

@ -207,13 +207,16 @@ RsGxsIntegrityCheck::RsGxsIntegrityCheck(
void RsGxsIntegrityCheck::run()
{
check();
std::vector<RsGxsGroupId> grps_to_delete;
GxsMsgReq msgs_to_delete;
check(mGenExchangeClient->serviceType(),mGixs,mDs,mDeletedGrps,mDeletedMsgs);
RS_STACK_MUTEX(mIntegrityMutex);
mDone = true;
}
bool RsGxsIntegrityCheck::check()
bool RsGxsIntegrityCheck::check(uint16_t service_type, RsGixs *mgixs, RsGeneralDataService *mds, std::vector<RsGxsGroupId>& grpsToDel, GxsMsgReq& msgsToDel)
{
#ifdef RS_DEEP_CHANNEL_INDEX
bool isGxsChannels = mGenExchangeClient->serviceType() == RS_SERVICE_GXS_TYPE_CHANNELS;
@ -222,8 +225,7 @@ bool RsGxsIntegrityCheck::check()
// first take out all the groups
std::map<RsGxsGroupId, RsNxsGrp*> grp;
mDs->retrieveNxsGrps(grp, true, true);
std::vector<RsGxsGroupId> grpsToDel;
mds->retrieveNxsGrps(grp, true, true);
GxsMsgReq msgIds;
GxsMsgReq grps;
@ -244,7 +246,7 @@ bool RsGxsIntegrityCheck::check()
if(currHash == grp->metaData->mHash)
{
// get all message ids of group
if (mDs->retrieveMsgIds(grp->grpId, msgIds[grp->grpId]) == 1)
if (mds->retrieveMsgIds(grp->grpId, msgIds[grp->grpId]) == 1)
{
// store the group for retrieveNxsMsgs
grps[grp->grpId];
@ -262,8 +264,7 @@ bool RsGxsIntegrityCheck::check()
rsReputations->overallReputationLevel(
grp->metaData->mAuthorId ) >
RsReputationLevel::LOCALLY_NEGATIVE )
used_gxs_ids.insert(std::make_pair(grp->metaData->mAuthorId, RsIdentityUsage(RsServiceType(mGenExchangeClient->serviceType()),
RsIdentityUsage::GROUP_AUTHOR_KEEP_ALIVE,grp->grpId)));
used_gxs_ids.insert(std::make_pair(grp->metaData->mAuthorId, RsIdentityUsage(RsServiceType(service_type), RsIdentityUsage::GROUP_AUTHOR_KEEP_ALIVE,grp->grpId)));
}
}
}
@ -317,33 +318,15 @@ bool RsGxsIntegrityCheck::check()
#endif // def RS_DEEP_CHANNEL_INDEX
}
if( !(grp->metaData->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED) &&
!(grp->metaData->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_ADMIN) &&
!(grp->metaData->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_PUBLISH) )
{
RsGroupNetworkStats stats;
mGenExchangeClient->getGroupNetworkStats(grp->grpId,stats);
if( stats.mSuppliers == 0 && stats.mMaxVisibleCount == 0
&& stats.mGrpAutoSync )
{
#ifdef DEBUG_GXSUTIL
GXSUTIL_DEBUG() << "Scheduling group \"" << grp->metaData->mGroupName << "\" ID=" << grp->grpId << " in service " << std::hex << mGenExchangeClient->serviceType() << std::dec << " for deletion because it has no suppliers not any visible data at friends." << std::endl;
#endif
grpsToDel.push_back(grp->grpId);
}
}
delete grp;
}
mDs->removeGroups(grpsToDel);
mds->removeGroups(grpsToDel);
// now messages
GxsMsgReq msgsToDel;
GxsMsgResult msgs;
mDs->retrieveNxsMsgs(grps, msgs, false, true);
mds->retrieveNxsMsgs(grps, msgs, false, true);
// check msg ids and messages
GxsMsgReq::iterator msgIdsIt;
@ -455,7 +438,7 @@ bool RsGxsIntegrityCheck::check()
rsReputations->overallReputationLevel(
msg->metaData->mAuthorId ) >
RsReputationLevel::LOCALLY_NEGATIVE )
used_gxs_ids.insert(std::make_pair(msg->metaData->mAuthorId,RsIdentityUsage(RsServiceType(mGenExchangeClient->serviceType()),
used_gxs_ids.insert(std::make_pair(msg->metaData->mAuthorId,RsIdentityUsage(RsServiceType(service_type),
RsIdentityUsage::MESSAGE_AUTHOR_KEEP_ALIVE,
msg->metaData->mGroupId,
msg->metaData->mMsgId,
@ -468,18 +451,9 @@ bool RsGxsIntegrityCheck::check()
}
}
mDs->removeMsgs(msgsToDel);
mds->removeMsgs(msgsToDel);
{
RS_STACK_MUTEX(mIntegrityMutex);
std::vector<RsGxsGroupId>::iterator grpIt;
for(grpIt = grpsToDel.begin(); grpIt != grpsToDel.end(); ++grpIt)
{
mDeletedGrps.push_back(*grpIt);
}
mDeletedMsgs = msgsToDel;
#ifdef DEBUG_GXSUTIL
GXSUTIL_DEBUG() << "At end of pass, this is the list used GXS ids: " << std::endl;
GXSUTIL_DEBUG() << " requesting them to GXS identity service to enforce loading." << std::endl;
@ -512,9 +486,9 @@ bool RsGxsIntegrityCheck::check()
GXSUTIL_DEBUG() << " requesting ID " << gxs_ids[n].first ;
#endif
if(!mGixs->haveKey(gxs_ids[n].first)) // checks if we have it already in the cache (conservative way to ensure that we atually have it)
if(!mgixs->haveKey(gxs_ids[n].first)) // checks if we have it already in the cache (conservative way to ensure that we atually have it)
{
mGixs->requestKey(gxs_ids[n].first,connected_friends,gxs_ids[n].second);
mgixs->requestKey(gxs_ids[n].first,connected_friends,gxs_ids[n].second);
++nb_requested_not_in_cache ;
#ifdef DEBUG_GXSUTIL
@ -527,7 +501,7 @@ bool RsGxsIntegrityCheck::check()
GXSUTIL_DEBUG() << " ... already in cache" << std::endl;
#endif
}
mGixs->timeStampKey(gxs_ids[n].first,gxs_ids[n].second);
mgixs->timeStampKey(gxs_ids[n].first,gxs_ids[n].second);
gxs_ids[n] = gxs_ids[gxs_ids.size()-1] ;
gxs_ids.pop_back() ;
@ -546,7 +520,7 @@ bool RsGxsIntegrityCheck::isDone()
return mDone;
}
void RsGxsIntegrityCheck::getDeletedIds(std::list<RsGxsGroupId>& grpIds, std::map<RsGxsGroupId, std::set<RsGxsMessageId> >& msgIds)
void RsGxsIntegrityCheck::getDeletedIds(std::vector<RsGxsGroupId>& grpIds, GxsMsgReq& msgIds)
{
RS_STACK_MUTEX(mIntegrityMutex);
grpIds = mDeletedGrps;

View File

@ -175,12 +175,12 @@ public:
RsGenExchange *genex, RsSerialType& gxsSerialiser,
RsGixs *gixs);
bool check();
static bool check(uint16_t service_type, RsGixs *mgixs, RsGeneralDataService *mds, std::vector<RsGxsGroupId>& grpsToDel, GxsMsgReq& msgsToDel);
bool isDone();
void run();
void getDeletedIds(std::list<RsGxsGroupId>& grpIds, std::map<RsGxsGroupId, std::set<RsGxsMessageId> > &msgIds);
void getDeletedIds(std::vector<RsGxsGroupId> &grpIds, GxsMsgReq &msgIds);
private:
@ -191,8 +191,8 @@ private:
#endif
bool mDone;
RsMutex mIntegrityMutex;
std::list<RsGxsGroupId> mDeletedGrps;
std::map<RsGxsGroupId, std::set<RsGxsMessageId> > mDeletedMsgs;
std::vector<RsGxsGroupId> mDeletedGrps;
GxsMsgReq mDeletedMsgs;
RsGixs* mGixs;
};