attempt to fix token queue growth in circles

This commit is contained in:
csoler 2021-01-23 22:54:29 +01:00
parent 4be2ed548c
commit 53f0c396e7
2 changed files with 59 additions and 38 deletions

View File

@ -67,6 +67,7 @@ static const uint32_t INTEGRITY_CHECK_PERIOD = 60*31; // 31 minutes
/* /*
* #define GEN_EXCH_DEBUG 1 * #define GEN_EXCH_DEBUG 1
*/ */
#define GEN_EXCH_DEBUG 1
// Data flow in RsGenExchange // Data flow in RsGenExchange
// //
@ -409,6 +410,7 @@ bool RsGenExchange::acknowledgeTokenMsg(const uint32_t& token,
// no dump token as client has ackowledged its completion // no dump token as client has ackowledged its completion
mDataAccess->disposeOfPublicToken(token); mDataAccess->disposeOfPublicToken(token);
mMsgNotify.erase(mit);
#ifdef GEN_EXCH_DEBUG #ifdef GEN_EXCH_DEBUG
std::cerr << " found grpId=" << msgId.first <<", msgId=" << msgId.second << std::endl; std::cerr << " found grpId=" << msgId.first <<", msgId=" << msgId.second << std::endl;
@ -441,6 +443,7 @@ bool RsGenExchange::acknowledgeTokenGrp(const uint32_t& token, RsGxsGroupId& grp
// no dump token as client has ackowledged its completion // no dump token as client has ackowledged its completion
mDataAccess->disposeOfPublicToken(token); mDataAccess->disposeOfPublicToken(token);
mGrpNotify.erase(mit);
#ifdef GEN_EXCH_DEBUG #ifdef GEN_EXCH_DEBUG
std::cerr << " found grpId=" << grpId << std::endl; std::cerr << " found grpId=" << grpId << std::endl;
@ -2158,7 +2161,8 @@ void RsGenExchange::processMsgMetaChanges()
{ {
RS_STACK_MUTEX(mGenMtx); RS_STACK_MUTEX(mGenMtx);
mMsgNotify.insert(std::make_pair(token, m.msgId)); //mMsgNotify.insert(std::make_pair(token, m.msgId));// (csoler) Is that needed??
mDataAccess->disposeOfPublicToken(token);
} }
} }
@ -2210,7 +2214,8 @@ void RsGenExchange::processGrpMetaChanges()
{ {
RS_STACK_MUTEX(mGenMtx); RS_STACK_MUTEX(mGenMtx);
mGrpNotify.insert(std::make_pair(token, g.grpId)); mDataAccess->disposeOfPublicToken(token);
//mGrpNotify.insert(std::make_pair(token, g.grpId)); // (csoler) I'm not sure that is even useful
} }
} }
@ -2594,8 +2599,7 @@ void RsGenExchange::processGroupDelete()
typedef std::pair<bool, RsGxsGroupId> GrpNote; typedef std::pair<bool, RsGxsGroupId> GrpNote;
std::map<uint32_t, GrpNote> toNotify; std::map<uint32_t, GrpNote> toNotify;
std::vector<GroupDeletePublish>::iterator vit = mGroupDeletePublish.begin(); for( auto vit = mGroupDeletePublish.begin();vit != mGroupDeletePublish.end(); ++vit)
for(; vit != mGroupDeletePublish.end(); ++vit)
{ {
std::vector<RsGxsGroupId> gprIds; std::vector<RsGxsGroupId> gprIds;
gprIds.push_back(vit->mGroupId); gprIds.push_back(vit->mGroupId);
@ -2603,18 +2607,16 @@ void RsGenExchange::processGroupDelete()
toNotify.insert(std::make_pair( vit->mToken, GrpNote(true, vit->mGroupId))); toNotify.insert(std::make_pair( vit->mToken, GrpNote(true, vit->mGroupId)));
} }
std::list<RsGxsGroupId> grpDeleted; std::list<RsGxsGroupId> grpDeleted;
std::map<uint32_t, GrpNote>::iterator mit = toNotify.begin(); for(auto mit=toNotify.begin(); mit != toNotify.end(); ++mit)
for(; mit != toNotify.end(); ++mit)
{ {
GrpNote& note = mit->second; GrpNote& note = mit->second;
RsTokenService::GxsRequestStatus status = RsTokenService::GxsRequestStatus status =
note.first ? RsTokenService::COMPLETE note.first ? RsTokenService::COMPLETE
: RsTokenService::FAILED; : RsTokenService::FAILED;
mGrpNotify.insert(std::make_pair(mit->first, note.second));
mDataAccess->updatePublicRequestStatus(mit->first, status); mDataAccess->updatePublicRequestStatus(mit->first, status);
mDataAccess->disposeOfPublicToken(mit->first);
if(note.first) if(note.first)
grpDeleted.push_back(note.second); grpDeleted.push_back(note.second);
@ -2632,39 +2634,52 @@ void RsGenExchange::processGroupDelete()
void RsGenExchange::processMessageDelete() void RsGenExchange::processMessageDelete()
{ {
RS_STACK_MUTEX(mGenMtx) ; RS_STACK_MUTEX(mGenMtx) ;
#ifdef TODO
typedef std::pair<bool, RsGxsGroupId> GrpNote; struct MsgNote {
std::map<uint32_t, GrpNote> toNotify; MsgNote(bool s,const GxsMsgReq& mid) : state(s),msgIds(mid){}
bool state;
GxsMsgReq msgIds;
};
std::map<uint32_t, MsgNote> toNotify;
for( auto vit = mMsgDeletePublish.begin(); vit != mMsgDeletePublish.end(); ++vit)
{
uint32_t token = (*vit).mToken;
bool res = mDataStore->removeMsgs( (*vit).mMsgs );
#ifdef GEN_EXCH_DEBUG
for(auto mit: (*vit).mMsgs)
{
std::cerr << "Attempt to delete messages: token=" << token << std::endl;
for(const auto& msg:mit.second)
std::cerr << " grpId=" << mit.first << ", msgId=" << msg << std::endl;
std::cerr << " Result: " << res << std::endl;
}
#endif #endif
toNotify.insert(std::make_pair(token, MsgNote(res,(*vit).mMsgs)));
}
for( std::vector<MsgDeletePublish>::iterator vit = mMsgDeletePublish.begin(); vit != mMsgDeletePublish.end(); ++vit) std::list<GxsMsgReq> msgDeleted;
{
#ifdef TODO
uint32_t token = (*vit).mToken;
const RsGxsGroupId& groupId = gdp.grpItem->meta.mGroupId;
toNotify.insert(std::make_pair( token, GrpNote(true, groupId)));
#endif
mDataStore->removeMsgs( (*vit).mMsgs );
}
// std::list<RsGxsGroupId> grpDeleted; for(auto mit = toNotify.begin(); mit != toNotify.end(); ++mit)
// std::map<uint32_t, GrpNote>::iterator mit = toNotify.begin(); {
// for(; mit != toNotify.end(); ++mit) MsgNote& note = mit->second;
// { RsTokenService::GxsRequestStatus status =
// GrpNote& note = mit->second; note.state ? RsTokenService::COMPLETE
// uint8_t status = note.first ? RsTokenService::GXS_REQUEST_V2_STATUS_COMPLETE : RsTokenService::FAILED;
// : RsTokenService::GXS_REQUEST_V2_STATUS_FAILED;
//
// mGrpNotify.insert(std::make_pair(mit->first, note.second));
// mDataAccess->updatePublicRequestStatus(mit->first, status);
//
// if(note.first)
// grpDeleted.push_back(note.second);
// }
for(uint32_t i=0;i<mMsgDeletePublish.size();++i) mDataAccess->updatePublicRequestStatus(mit->first, status);
for(auto it(mMsgDeletePublish[i].mMsgs.begin());it!=mMsgDeletePublish[i].mMsgs.end();++it) mDataAccess->disposeOfPublicToken(mit->first);
mNotifications.push_back(new RsGxsGroupChange(RsGxsNotify::TYPE_MESSAGE_DELETED,it->first, false));
if(note.state)
msgDeleted.push_back(note.msgIds);
}
for(const auto& msgreq:msgDeleted)
for(const auto& msgit:msgreq)
for(const auto& msg:msgit.second)
mNotifications.push_back(new RsGxsMsgChange(RsGxsNotify::TYPE_MESSAGE_DELETED,msgit.first,msg, false));
mMsgDeletePublish.clear(); mMsgDeletePublish.clear();
} }
@ -2958,7 +2973,11 @@ void RsGenExchange::publishGrps()
uint32_t RsGenExchange::generatePublicToken() uint32_t RsGenExchange::generatePublicToken()
{ {
return mDataAccess->generatePublicToken(); uint32_t token = mDataAccess->generatePublicToken();
#ifdef GEN_EXCH_DEBUG
std::cerr << "New token generated: " << token << " in RsGenExchange::generatePublicToken()" << std::endl;
#endif
return token;
} }
bool RsGenExchange::updatePublicRequestStatus( bool RsGenExchange::updatePublicRequestStatus(

View File

@ -1801,6 +1801,8 @@ uint32_t RsGxsDataAccess::generatePublicToken()
mPublicToken[token] = PENDING ; mPublicToken[token] = PENDING ;
#ifdef DATA_DEBUG #ifdef DATA_DEBUG
GXSDATADEBUG << "Service " << std::hex << mDataStore->serviceType() << std::dec << ": Adding new public token " << token << " in PENDING state. Completed tokens: " << mCompletedRequests.size() << " Size of mPublicToken: " << mPublicToken.size() << std::endl; GXSDATADEBUG << "Service " << std::hex << mDataStore->serviceType() << std::dec << ": Adding new public token " << token << " in PENDING state. Completed tokens: " << mCompletedRequests.size() << " Size of mPublicToken: " << mPublicToken.size() << std::endl;
if(mDataStore->serviceType() == 0x218 && token==19)
print_stacktrace();
#endif #endif
} }