fixed up new version of GxsDataAccess

This commit is contained in:
csoler 2020-04-06 18:34:57 +02:00
parent 2d23a9f251
commit 65af73f8eb
No known key found for this signature in database
GPG key ID: 7BCA522266C0804C
3 changed files with 101 additions and 57 deletions

View file

@ -317,6 +317,15 @@ void RsGxsDataAccess::storeRequest(GxsRequest* req)
req->reqTime = time(NULL); req->reqTime = time(NULL);
mRequestQueue.insert(std::make_pair(req->token,req)); mRequestQueue.insert(std::make_pair(req->token,req));
mPublicToken[req->token] = PENDING;
#ifdef DATA_DEBUG
std::cerr << "Stored request token=" << req->token << " priority = " << static_cast<int>(req->priority) << " Current request Queue is:" ;
for(auto it(mRequestQueue.begin());it!=mRequestQueue.end();++it)
std::cerr << it->first << " (p=" << static_cast<int>(req->priority) << ") ";
std::cerr << std::endl;
std::cerr << "Completed requests waiting for client: " << mCompletedRequests.size() << std::endl;
#endif
} }
RsTokenService::GxsRequestStatus RsGxsDataAccess::requestStatus(uint32_t token) RsTokenService::GxsRequestStatus RsGxsDataAccess::requestStatus(uint32_t token)
@ -326,13 +335,13 @@ RsTokenService::GxsRequestStatus RsGxsDataAccess::requestStatus(uint32_t token)
uint32_t anstype; uint32_t anstype;
rstime_t ts; rstime_t ts;
{ // {
RS_STACK_MUTEX(mDataMutex); // RS_STACK_MUTEX(mDataMutex);
//
// first check public tokens // // first check public tokens
if(mPublicToken.find(token) != mPublicToken.end()) // if(mPublicToken.find(token) != mPublicToken.end())
return mPublicToken[token]; // return mPublicToken[token];
} // }
if (!checkRequestStatus(token, status, reqtype, anstype, ts)) if (!checkRequestStatus(token, status, reqtype, anstype, ts))
return RsTokenService::FAILED; return RsTokenService::FAILED;
@ -344,7 +353,7 @@ bool RsGxsDataAccess::cancelRequest(const uint32_t& token)
{ {
RsStackMutex stack(mDataMutex); /****** LOCKED *****/ RsStackMutex stack(mDataMutex); /****** LOCKED *****/
GxsRequest* req = locked_retrieveRequest(token); GxsRequest* req = locked_retrieveCompetedRequest(token);
if (!req) if (!req)
{ {
return false; return false;
@ -371,6 +380,10 @@ bool RsGxsDataAccess::locked_clearRequest(const uint32_t& token)
delete it->second; delete it->second;
mCompletedRequests.erase(it); mCompletedRequests.erase(it);
auto it2 = mPublicToken.find(token);
if(it2 != mPublicToken.end())
mPublicToken.erase(it2);
return true; return true;
} }
@ -378,7 +391,7 @@ bool RsGxsDataAccess::getGroupSummary(const uint32_t& token, std::list<const RsG
{ {
RS_STACK_MUTEX(mDataMutex); RS_STACK_MUTEX(mDataMutex);
GxsRequest* req = locked_retrieveRequest(token); GxsRequest* req = locked_retrieveCompetedRequest(token);
if(req == NULL) if(req == NULL)
{ {
@ -409,7 +422,7 @@ bool RsGxsDataAccess::getGroupData(const uint32_t& token, std::list<RsNxsGrp*>&
{ {
RS_STACK_MUTEX(mDataMutex); RS_STACK_MUTEX(mDataMutex);
GxsRequest* req = locked_retrieveRequest(token); GxsRequest* req = locked_retrieveCompetedRequest(token);
if(req == NULL) if(req == NULL)
{ {
@ -447,7 +460,7 @@ bool RsGxsDataAccess::getMsgData(const uint32_t& token, NxsMsgDataResult& msgDat
RsStackMutex stack(mDataMutex); RsStackMutex stack(mDataMutex);
GxsRequest* req = locked_retrieveRequest(token); GxsRequest* req = locked_retrieveCompetedRequest(token);
if(req == NULL) if(req == NULL)
{ {
@ -478,7 +491,7 @@ bool RsGxsDataAccess::getMsgRelatedData(const uint32_t &token, NxsMsgRelatedData
RsStackMutex stack(mDataMutex); RsStackMutex stack(mDataMutex);
GxsRequest* req = locked_retrieveRequest(token); GxsRequest* req = locked_retrieveCompetedRequest(token);
if(req == NULL) if(req == NULL)
{ {
@ -511,7 +524,7 @@ bool RsGxsDataAccess::getMsgSummary(const uint32_t& token, GxsMsgMetaResult& msg
RsStackMutex stack(mDataMutex); RsStackMutex stack(mDataMutex);
GxsRequest* req = locked_retrieveRequest(token); GxsRequest* req = locked_retrieveCompetedRequest(token);
if(req == NULL) if(req == NULL)
{ {
@ -538,7 +551,7 @@ bool RsGxsDataAccess::getMsgRelatedSummary(const uint32_t &token, MsgRelatedMeta
{ {
RsStackMutex stack(mDataMutex); RsStackMutex stack(mDataMutex);
GxsRequest* req = locked_retrieveRequest(token); GxsRequest* req = locked_retrieveCompetedRequest(token);
if(req == NULL) if(req == NULL)
{ {
@ -569,7 +582,7 @@ bool RsGxsDataAccess::getMsgRelatedList(const uint32_t &token, MsgRelatedIdResul
{ {
RsStackMutex stack(mDataMutex); RsStackMutex stack(mDataMutex);
GxsRequest* req = locked_retrieveRequest(token); GxsRequest* req = locked_retrieveCompetedRequest(token);
if(req == NULL) if(req == NULL)
{ {
@ -599,7 +612,7 @@ bool RsGxsDataAccess::getMsgList(const uint32_t& token, GxsMsgIdResult& msgIds)
{ {
RsStackMutex stack(mDataMutex); RsStackMutex stack(mDataMutex);
GxsRequest* req = locked_retrieveRequest(token); GxsRequest* req = locked_retrieveCompetedRequest(token);
if(req == NULL) if(req == NULL)
{ {
@ -626,7 +639,7 @@ bool RsGxsDataAccess::getGroupList(const uint32_t& token, std::list<RsGxsGroupId
{ {
RsStackMutex stack(mDataMutex); RsStackMutex stack(mDataMutex);
GxsRequest* req = locked_retrieveRequest(token); GxsRequest* req = locked_retrieveCompetedRequest(token);
if(req == NULL) if(req == NULL)
{ {
@ -653,7 +666,7 @@ bool RsGxsDataAccess::getGroupStatistic(const uint32_t &token, GxsGroupStatistic
{ {
RsStackMutex stack(mDataMutex); RsStackMutex stack(mDataMutex);
GxsRequest* req = locked_retrieveRequest(token); GxsRequest* req = locked_retrieveCompetedRequest(token);
if(req == NULL) if(req == NULL)
{ {
@ -677,7 +690,7 @@ bool RsGxsDataAccess::getServiceStatistic(const uint32_t &token, GxsServiceStati
{ {
RsStackMutex stack(mDataMutex); RsStackMutex stack(mDataMutex);
GxsRequest* req = locked_retrieveRequest(token); GxsRequest* req = locked_retrieveCompetedRequest(token);
if(req == NULL) if(req == NULL)
{ {
@ -696,7 +709,7 @@ bool RsGxsDataAccess::getServiceStatistic(const uint32_t &token, GxsServiceStati
locked_clearRequest(token); locked_clearRequest(token);
return true; return true;
} }
GxsRequest* RsGxsDataAccess::locked_retrieveRequest(const uint32_t& token) GxsRequest* RsGxsDataAccess::locked_retrieveCompetedRequest(const uint32_t& token)
{ {
auto it = mCompletedRequests.find(token) ; auto it = mCompletedRequests.find(token) ;
@ -711,6 +724,9 @@ GxsRequest* RsGxsDataAccess::locked_retrieveRequest(const uint32_t& token)
void RsGxsDataAccess::processRequests() void RsGxsDataAccess::processRequests()
{ {
// process requests // process requests
#ifdef DATA_DEBUG
RsDbg() << "processing requests" << std::endl;
#endif
while (!mRequestQueue.empty()) while (!mRequestQueue.empty())
{ {
@ -732,11 +748,14 @@ void RsGxsDataAccess::processRequests()
switch( mRequestQueue.begin()->second->status ) switch( mRequestQueue.begin()->second->status )
{ {
case PARTIAL: case PARTIAL:
RsErr() << "Found partial request in mRequestQueue. This is a bug." << std::endl; // fallthrough
case COMPLETE: case COMPLETE:
case DONE: case DONE:
RsErr() << "Found partial/done/complete request in mRequestQueue. This is a bug." << std::endl; // fallthrough
case FAILED: case FAILED:
case CANCELLED: case CANCELLED:
#ifdef DATA_DEBUG
RsDbg() << " request " << mRequestQueue.begin()->second->token << ": status = " << mRequestQueue.begin()->second->status << ": removing from the RequestQueue" << std::endl;
#endif
mRequestQueue.erase(mRequestQueue.begin()); mRequestQueue.erase(mRequestQueue.begin());
continue; continue;
break; break;
@ -765,9 +784,7 @@ void RsGxsDataAccess::processRequests()
ServiceStatisticRequest* ssr; ServiceStatisticRequest* ssr;
#ifdef DATA_DEBUG #ifdef DATA_DEBUG
std::cerr << "RsGxsDataAccess::processRequests() Processing Token: " << req->token << " Status: " RsDbg() << "Processing request: " << req->token << " Status: " << req->status << " ReqType: " << req->reqType << " Age: " << time(NULL) - req->reqTime << std::endl;
<< req->status << " ReqType: " << req->reqType << " Age: "
<< time(NULL) - req->reqTime << std::endl;
#endif #endif
/* PROCESS REQUEST! */ /* PROCESS REQUEST! */
@ -813,12 +830,8 @@ void RsGxsDataAccess::processRequests()
{ {
ok = getGroupSerializedData(grr); ok = getGroupSerializedData(grr);
} }
else else
{ std::cerr << "RsGxsDataAccess::processRequests() Failed to process request, token: " << req->token << std::endl;
std::cerr << "RsGxsDataAccess::processRequests() Failed to process request, token: "
<< req->token << std::endl;
}
// We cannot easily remove the request here because the queue may have more elements now and mRequestQueue.begin() is not necessarily the same element. // We cannot easily remove the request here because the queue may have more elements now and mRequestQueue.begin() is not necessarily the same element.
// but we mark it as COMPLETE/FAILED so that it will be removed in the next loop. // but we mark it as COMPLETE/FAILED so that it will be removed in the next loop.
@ -829,11 +842,21 @@ void RsGxsDataAccess::processRequests()
{ {
// When the request is complete, we move it to the complete list, so that the caller can easily retrieve the request data // When the request is complete, we move it to the complete list, so that the caller can easily retrieve the request data
#ifdef DATA_DEBUG
RsDbg() << " Request completed successfully. Marking as COMPLETE." << std::endl;
#endif
req->status = COMPLETE ; req->status = COMPLETE ;
mCompletedRequests[req->token] = req; mCompletedRequests[req->token] = req;
mPublicToken[req->token] = COMPLETE;
} }
else else
{
req->status = FAILED; req->status = FAILED;
mPublicToken[req->token] = FAILED;
#ifdef DATA_DEBUG
RsDbg() << " Request failed. Marking as FAILED." << std::endl;
#endif
}
} }
} // END OF MUTEX. } // END OF MUTEX.
@ -1680,23 +1703,37 @@ void RsGxsDataAccess::filterGrpList(std::list<RsGxsGroupId> &grpIds, const RsTok
} }
bool RsGxsDataAccess::checkRequestStatus( bool RsGxsDataAccess::checkRequestStatus( uint32_t token, GxsRequestStatus& status, uint32_t& reqtype, uint32_t& anstype, rstime_t& ts )
uint32_t token, GxsRequestStatus& status, uint32_t& reqtype,
uint32_t& anstype, rstime_t& ts )
{ {
RS_STACK_MUTEX(mDataMutex); RS_STACK_MUTEX(mDataMutex);
GxsRequest* req = locked_retrieveRequest(token); GxsRequest* req = locked_retrieveCompetedRequest(token);
if (req == NULL || req->status == CANCELLED) std::cerr << "CheckRequestStatus: token=" << token ;
return false;
anstype = req->ansType; if(req != NULL)
reqtype = req->reqType; {
status = req->status; anstype = req->ansType;
ts = req->reqTime; reqtype = req->reqType;
status = COMPLETE;
ts = req->reqTime;
return true; std::cerr << " In mCompletedRequests. Returning status = COMPLETE" << std::endl;
return true;
}
auto it = mPublicToken.find(token);
if(it != mPublicToken.end())
{
status = it->second;
std::cerr << " In mPublicToken. Returning status = " << status << std::endl;
return true;
}
status = FAILED;
std::cerr << " Token not found. Returning FAILED" << std::endl;
return false;
} }
bool RsGxsDataAccess::addGroupData(RsNxsGrp* grp) { bool RsGxsDataAccess::addGroupData(RsNxsGrp* grp) {
@ -1759,7 +1796,7 @@ void RsGxsDataAccess::tokenList(std::list<uint32_t>& tokens)
bool RsGxsDataAccess::locked_updateRequestStatus( uint32_t token, RsTokenService::GxsRequestStatus status ) bool RsGxsDataAccess::locked_updateRequestStatus( uint32_t token, RsTokenService::GxsRequestStatus status )
{ {
GxsRequest* req = locked_retrieveRequest(token); GxsRequest* req = locked_retrieveCompetedRequest(token);
if(req) req->status = status; if(req) req->status = status;
else return false; else return false;
@ -1774,7 +1811,7 @@ uint32_t RsGxsDataAccess::generatePublicToken()
{ {
RS_STACK_MUTEX(mDataMutex); RS_STACK_MUTEX(mDataMutex);
mPublicToken[token] = RsTokenService::PENDING; mPublicToken[token] = PENDING ;
} }
return token; return token;
@ -1782,15 +1819,19 @@ uint32_t RsGxsDataAccess::generatePublicToken()
bool RsGxsDataAccess::updatePublicRequestStatus( bool RsGxsDataAccess::updatePublicRequestStatus( uint32_t token, RsTokenService::GxsRequestStatus status )
uint32_t token, RsTokenService::GxsRequestStatus status )
{ {
RS_STACK_MUTEX(mDataMutex); RS_STACK_MUTEX(mDataMutex);
std::map<uint32_t, RsTokenService::GxsRequestStatus>::iterator mit =
mPublicToken.find(token); auto mit = mPublicToken.find(token);
if(mit != mPublicToken.end()) mit->second = status;
else return false; if(mit != mPublicToken.end())
return true; {
mit->second = status;
return true;
}
else
return false;
} }
@ -1798,11 +1839,14 @@ bool RsGxsDataAccess::updatePublicRequestStatus(
bool RsGxsDataAccess::disposeOfPublicToken(uint32_t token) bool RsGxsDataAccess::disposeOfPublicToken(uint32_t token)
{ {
RS_STACK_MUTEX(mDataMutex); RS_STACK_MUTEX(mDataMutex);
std::map<uint32_t, RsTokenService::GxsRequestStatus>::iterator mit = auto mit = mPublicToken.find(token);
mPublicToken.find(token); if(mit != mPublicToken.end())
if(mit != mPublicToken.end()) mPublicToken.erase(mit); {
else return false; mPublicToken.erase(mit);
return true; return true;
}
else
return false;
} }
bool RsGxsDataAccess::checkGrpFilter(const RsTokReqOptions &opts, const RsGxsGrpMetaData *meta) const bool RsGxsDataAccess::checkGrpFilter(const RsTokReqOptions &opts, const RsGxsGrpMetaData *meta) const

View file

@ -274,7 +274,7 @@ private:
* @param token the value of the token for the request object handle wanted * @param token the value of the token for the request object handle wanted
* @return the request associated to this token * @return the request associated to this token
*/ */
GxsRequest* locked_retrieveRequest(const uint32_t& token); GxsRequest* locked_retrieveCompetedRequest(const uint32_t& token);
/*! /*!
* Add a gxs request to queue * Add a gxs request to queue

View file

@ -37,14 +37,14 @@ enum class GxsRequestPriority {
struct GxsRequest struct GxsRequest
{ {
GxsRequest() : GxsRequest() :
token(0), reqTime(0), ansType(0), reqType(0), token(0), reqTime(0), ansType(0), reqType(0),priority(GxsRequestPriority::NORMAL),
status(RsTokenService::FAILED) {} status(RsTokenService::FAILED) {}
virtual ~GxsRequest() {} virtual ~GxsRequest() {}
uint32_t token; uint32_t token;
uint32_t reqTime; uint32_t reqTime;
RS_DEPRECATED uint32_t ansType; /// G10h4ck: This is of no use RS_DEPRECATED uint32_t ansType; /// G10h4ck: This is of no use. csoler: it's made available to the clients.
uint32_t reqType; uint32_t reqType;
GxsRequestPriority priority; GxsRequestPriority priority;
RsTokReqOptions Options; RsTokReqOptions Options;