experimental implementation of TokenQueue priority. not fully functional yet

This commit is contained in:
csoler 2020-04-05 22:59:58 +02:00
parent 9a1d589134
commit 2d23a9f251
No known key found for this signature in database
GPG Key ID: 7BCA522266C0804C
3 changed files with 281 additions and 368 deletions

View File

@ -32,14 +32,20 @@
#define DATA_DEBUG 1 #define DATA_DEBUG 1
bool operator<(const std::pair<uint32_t,GxsRequest*>& p1,const std::pair<uint32_t,GxsRequest*>& p2)
{
return p1.second->priority <= p2.second->priority ; // <= so that new elements with same priority are inserted before
}
RsGxsDataAccess::RsGxsDataAccess(RsGeneralDataService* ds) : RsGxsDataAccess::RsGxsDataAccess(RsGeneralDataService* ds) :
mDataStore(ds), mDataMutex("RsGxsDataAccess"), mNextToken(0) {} mDataStore(ds), mDataMutex("RsGxsDataAccess"), mNextToken(0) {}
RsGxsDataAccess::~RsGxsDataAccess() RsGxsDataAccess::~RsGxsDataAccess()
{ {
for(std::map<uint32_t, GxsRequest*>::const_iterator it(mRequests.begin());it!=mRequests.end();++it) for(auto& it:mRequestQueue)
delete it->second ; delete it.second;
} }
bool RsGxsDataAccess::requestGroupInfo( bool RsGxsDataAccess::requestGroupInfo(
uint32_t &token, uint32_t ansType, const RsTokReqOptions &opts, uint32_t &token, uint32_t ansType, const RsTokReqOptions &opts,
@ -309,7 +315,8 @@ void RsGxsDataAccess::storeRequest(GxsRequest* req)
RS_STACK_MUTEX(mDataMutex); RS_STACK_MUTEX(mDataMutex);
req->status = PENDING; req->status = PENDING;
req->reqTime = time(NULL); req->reqTime = time(NULL);
mRequests[req->token] = req;
mRequestQueue.insert(std::make_pair(req->token,req));
} }
RsTokenService::GxsRequestStatus RsGxsDataAccess::requestStatus(uint32_t token) RsTokenService::GxsRequestStatus RsGxsDataAccess::requestStatus(uint32_t token)
@ -350,25 +357,25 @@ bool RsGxsDataAccess::cancelRequest(const uint32_t& token)
bool RsGxsDataAccess::clearRequest(const uint32_t& token) bool RsGxsDataAccess::clearRequest(const uint32_t& token)
{ {
RsStackMutex stack(mDataMutex); /****** LOCKED *****/ RS_STACK_MUTEX(mDataMutex);
return locked_clearRequest(token);
}
std::map<uint32_t, GxsRequest*>::iterator it; bool RsGxsDataAccess::locked_clearRequest(const uint32_t& token)
{
auto it = mCompletedRequests.find(token);
it = mRequests.find(token); if(it == mCompletedRequests.end())
if (it == mRequests.end()) return false;
{
return false;
}
delete it->second; delete it->second;
mRequests.erase(it); mCompletedRequests.erase(it);
return true; return true;
} }
bool RsGxsDataAccess::getGroupSummary(const uint32_t& token, std::list<const RsGxsGrpMetaData*>& groupInfo) bool RsGxsDataAccess::getGroupSummary(const uint32_t& token, std::list<const RsGxsGrpMetaData*>& groupInfo)
{ {
RS_STACK_MUTEX(mDataMutex); RS_STACK_MUTEX(mDataMutex);
GxsRequest* req = locked_retrieveRequest(token); GxsRequest* req = locked_retrieveRequest(token);
@ -379,29 +386,21 @@ bool RsGxsDataAccess::getGroupSummary(const uint32_t& token, std::list<const RsG
<< "group summary" << std::endl; << "group summary" << std::endl;
return false; return false;
} }
else if(req->status == COMPLETE)
{
GroupMetaReq* gmreq = dynamic_cast<GroupMetaReq*>(req);
if(gmreq) GroupMetaReq* gmreq = dynamic_cast<GroupMetaReq*>(req);
{
groupInfo = gmreq->mGroupMetaData; if(gmreq)
gmreq->mGroupMetaData.clear(); {
locked_updateRequestStatus(token, DONE); groupInfo = gmreq->mGroupMetaData;
} gmreq->mGroupMetaData.clear();
else
{
std::cerr << "RsGxsDataAccess::getGroupSummary() Req found, failed"
<< "cast" << std::endl;
return false;
}
} }
else else
{ {
std::cerr << "RsGxsDataAccess::getGroupSummary() Req not ready" std::cerr << "RsGxsDataAccess::getGroupSummary() Req found, failed"
<< std::endl; << "cast" << std::endl;
return false; return false;
} }
locked_clearRequest(token);
return true; return true;
} }
@ -418,36 +417,27 @@ bool RsGxsDataAccess::getGroupData(const uint32_t& token, std::list<RsNxsGrp*>&
<< "data" << std::endl; << "data" << std::endl;
return false; return false;
} }
else if(req->status == COMPLETE)
GroupDataReq* gmreq = dynamic_cast<GroupDataReq*>(req);
GroupSerializedDataReq* gsreq = dynamic_cast<GroupSerializedDataReq*>(req);
if(gsreq)
{ {
GroupDataReq* gmreq = dynamic_cast<GroupDataReq*>(req); grpData.swap(gsreq->mGroupData);
GroupSerializedDataReq* gsreq = dynamic_cast<GroupSerializedDataReq*>(req); gsreq->mGroupData.clear();
}
if(gsreq) else if(gmreq)
{ {
grpData.swap(gsreq->mGroupData); grpData.swap(gmreq->mGroupData);
gsreq->mGroupData.clear(); gmreq->mGroupData.clear();
locked_updateRequestStatus(token, DONE);
}
else if(gmreq)
{
grpData.swap(gmreq->mGroupData);
gmreq->mGroupData.clear();
locked_updateRequestStatus(token, DONE);
}
else
{
std::cerr << "RsGxsDataAccess::getGroupData() Req found, failed cast"
<< " req->reqType: " << req->reqType << std::endl;
return false;
}
} }
else else
{ {
std::cerr << "RsGxsDataAccess::getGroupData() Req not ready" << std::endl; std::cerr << "RsGxsDataAccess::getGroupData() Req found, failed cast"
<< " req->reqType: " << req->reqType << std::endl;
return false; return false;
} }
locked_clearRequest(token);
return true; return true;
} }
@ -459,29 +449,26 @@ bool RsGxsDataAccess::getMsgData(const uint32_t& token, NxsMsgDataResult& msgDat
GxsRequest* req = locked_retrieveRequest(token); GxsRequest* req = locked_retrieveRequest(token);
if(req == NULL){ if(req == NULL)
{
std::cerr << "RsGxsDataAccess::getMsgData() Unable to retrieve group data" << std::endl; std::cerr << "RsGxsDataAccess::getMsgData() Unable to retrieve group data" << std::endl;
return false; return false;
}else if(req->status == COMPLETE){ }
MsgDataReq* mdreq = dynamic_cast<MsgDataReq*>(req); MsgDataReq* mdreq = dynamic_cast<MsgDataReq*>(req);
if(mdreq) if(mdreq)
{ {
msgData.swap(mdreq->mMsgData); msgData.swap(mdreq->mMsgData);
mdreq->mMsgData.clear(); mdreq->mMsgData.clear();
locked_updateRequestStatus(token, DONE); }
} else
else {
{ std::cerr << "RsGxsDataAccess::getMsgData() Req found, failed caste" << std::endl;
std::cerr << "RsGxsDataAccess::getMsgData() Req found, failed caste" << std::endl;
return false;
}
}else{
std::cerr << "RsGxsDataAccess::getMsgData() Req not ready" << std::endl;
return false; return false;
} }
locked_clearRequest(token);
return true; return true;
} }
@ -493,32 +480,28 @@ bool RsGxsDataAccess::getMsgRelatedData(const uint32_t &token, NxsMsgRelatedData
GxsRequest* req = locked_retrieveRequest(token); GxsRequest* req = locked_retrieveRequest(token);
if(req == NULL){ if(req == NULL)
{
std::cerr << "RsGxsDataAccess::getMsgRelatedData() Unable to retrieve group data" << std::endl;
return false;
}
MsgRelatedInfoReq* mrireq = dynamic_cast<MsgRelatedInfoReq*>(req);
std::cerr << "RsGxsDataAccess::getMsgRelatedData() Unable to retrieve group data" << std::endl; if(req->Options.mReqType != GXS_REQUEST_TYPE_MSG_RELATED_DATA)
return false; return false;
}else if(req->status == COMPLETE){
MsgRelatedInfoReq* mrireq = dynamic_cast<MsgRelatedInfoReq*>(req); if(mrireq)
{
msgData.swap(mrireq->mMsgDataResult);
mrireq->mMsgDataResult.clear();
}
else
{
std::cerr << "RsGxsDataAccess::getMsgRelatedData() Req found, failed caste" << std::endl;
return false;
}
if(req->Options.mReqType != GXS_REQUEST_TYPE_MSG_RELATED_DATA) locked_clearRequest(token);
return false;
if(mrireq)
{
msgData.swap(mrireq->mMsgDataResult);
mrireq->mMsgDataResult.clear();
locked_updateRequestStatus(token, DONE);
}
else
{
std::cerr << "RsGxsDataAccess::getMsgRelatedData() Req found, failed caste" << std::endl;
return false;
}
}else{
std::cerr << "RsGxsDataAccess::getMsgRelatedData() Req not ready" << std::endl;
return false;
}
return true; return true;
} }
@ -530,111 +513,86 @@ bool RsGxsDataAccess::getMsgSummary(const uint32_t& token, GxsMsgMetaResult& msg
GxsRequest* req = locked_retrieveRequest(token); GxsRequest* req = locked_retrieveRequest(token);
if(req == NULL){ if(req == NULL)
{
std::cerr << "RsGxsDataAccess::getMsgSummary() Unable to retrieve group data" << std::endl; std::cerr << "RsGxsDataAccess::getMsgSummary() Unable to retrieve group data" << std::endl;
return false; return false;
}else if(req->status == COMPLETE){ }
MsgMetaReq* mmreq = dynamic_cast<MsgMetaReq*>(req);
MsgMetaReq* mmreq = dynamic_cast<MsgMetaReq*>(req); if(mmreq)
{
if(mmreq) msgInfo.swap(mmreq->mMsgMetaData);
{ mmreq->mMsgMetaData.clear();
msgInfo.swap(mmreq->mMsgMetaData); }
mmreq->mMsgMetaData.clear(); else
locked_updateRequestStatus(token, DONE); {
std::cerr << "RsGxsDataAccess::getMsgSummary() Req found, failed caste" << std::endl;
}
else
{
std::cerr << "RsGxsDataAccess::getMsgSummary() Req found, failed caste" << std::endl;
return false;
}
}else{
std::cerr << "RsGxsDataAccess::getMsgSummary() Req not ready" << std::endl;
return false; return false;
} }
locked_clearRequest(token);
return true; return true;
} }
bool RsGxsDataAccess::getMsgRelatedSummary(const uint32_t &token, MsgRelatedMetaResult &msgMeta) bool RsGxsDataAccess::getMsgRelatedSummary(const uint32_t &token, MsgRelatedMetaResult &msgMeta)
{ {
RsStackMutex stack(mDataMutex);
RsStackMutex stack(mDataMutex); GxsRequest* req = locked_retrieveRequest(token);
GxsRequest* req = locked_retrieveRequest(token); if(req == NULL)
{
std::cerr << "RsGxsDataAccess::getMsgRelatedSummary() Unable to retrieve message summary" << std::endl;
return false;
}
if(req->Options.mReqType != GXS_REQUEST_TYPE_MSG_RELATED_META)
return false;
MsgRelatedInfoReq* mrireq = dynamic_cast<MsgRelatedInfoReq*>(req);
if(mrireq)
if(req == NULL){ {
msgMeta.swap(mrireq->mMsgMetaResult);
std::cerr << "RsGxsDataAccess::getMsgRelatedSummary() Unable to retrieve message summary" << std::endl; mrireq->mMsgMetaResult.clear();
return false; }
}else if(req->status == COMPLETE){ else
{
if(req->Options.mReqType != GXS_REQUEST_TYPE_MSG_RELATED_META) std::cerr << "RsGxsDataAccess::getMsgRelatedSummary() Req found, failed caste" << std::endl;
return false; return false;
}
MsgRelatedInfoReq* mrireq = dynamic_cast<MsgRelatedInfoReq*>(req); locked_clearRequest(token);
return true;
if(mrireq)
{
msgMeta.swap(mrireq->mMsgMetaResult);
mrireq->mMsgMetaResult.clear();
locked_updateRequestStatus(token, DONE);
}
else
{
std::cerr << "RsGxsDataAccess::getMsgRelatedSummary() Req found, failed caste" << std::endl;
return false;
}
}
else
{
std::cerr << "RsGxsDataAccess::getMsgRelatedSummary() Req not ready" << std::endl;
return false;
}
return true;
} }
bool RsGxsDataAccess::getMsgRelatedList(const uint32_t &token, MsgRelatedIdResult &msgIds) bool RsGxsDataAccess::getMsgRelatedList(const uint32_t &token, MsgRelatedIdResult &msgIds)
{ {
RsStackMutex stack(mDataMutex); RsStackMutex stack(mDataMutex);
GxsRequest* req = locked_retrieveRequest(token); GxsRequest* req = locked_retrieveRequest(token);
if(req == NULL){ if(req == NULL)
{
std::cerr << "RsGxsDataAccess::getMsgRelatedList() Unable to retrieve message data" << std::endl;
return false;
}
if(req->Options.mReqType != GXS_REQUEST_TYPE_MSG_RELATED_IDS)
return false;
std::cerr << "RsGxsDataAccess::getMsgRelatedList() Unable to retrieve message data" << std::endl; MsgRelatedInfoReq* mrireq = dynamic_cast<MsgRelatedInfoReq*>(req);
return false;
}else if(req->status == COMPLETE){
if(req->Options.mReqType != GXS_REQUEST_TYPE_MSG_RELATED_IDS) if(mrireq)
return false; {
msgIds.swap(mrireq->mMsgIdResult);
MsgRelatedInfoReq* mrireq = dynamic_cast<MsgRelatedInfoReq*>(req); mrireq->mMsgIdResult.clear();
}
if(mrireq) else
{ {
msgIds.swap(mrireq->mMsgIdResult); std::cerr << "RsGxsDataAccess::getMsgRelatedList() Req found, failed caste" << std::endl;
mrireq->mMsgIdResult.clear(); return false;
locked_updateRequestStatus(token, DONE); }
} locked_clearRequest(token);
else{ return true;
std::cerr << "RsGxsDataAccess::getMsgRelatedList() Req found, failed caste" << std::endl;
return false;
}
}
else
{
std::cerr << "RsGxsDataAccess::getMsgRelatedList() Req not ready" << std::endl;
return false;
}
return true;
} }
bool RsGxsDataAccess::getMsgList(const uint32_t& token, GxsMsgIdResult& msgIds) bool RsGxsDataAccess::getMsgList(const uint32_t& token, GxsMsgIdResult& msgIds)
@ -643,29 +601,24 @@ bool RsGxsDataAccess::getMsgList(const uint32_t& token, GxsMsgIdResult& msgIds)
GxsRequest* req = locked_retrieveRequest(token); GxsRequest* req = locked_retrieveRequest(token);
if(req == NULL){ if(req == NULL)
{
std::cerr << "RsGxsDataAccess::getMsgList() Unable to retrieve msg Ids" << std::endl; std::cerr << "RsGxsDataAccess::getMsgList() Unable to retrieve msg Ids" << std::endl;
return false;
}else if(req->status == COMPLETE){
MsgIdReq* mireq = dynamic_cast<MsgIdReq*>(req);
if(mireq)
{
msgIds.swap(mireq->mMsgIdResult);
mireq->mMsgIdResult.clear();
locked_updateRequestStatus(token, DONE);
}
else{
std::cerr << "RsGxsDataAccess::getMsgList() Req found, failed caste" << std::endl;
return false;
}
}else{
std::cerr << "RsGxsDataAccess::getMsgList() Req not ready" << std::endl;
return false; return false;
} }
MsgIdReq* mireq = dynamic_cast<MsgIdReq*>(req);
if(mireq)
{
msgIds.swap(mireq->mMsgIdResult);
mireq->mMsgIdResult.clear();
}
else
{
std::cerr << "RsGxsDataAccess::getMsgList() Req found, failed caste" << std::endl;
return false;
}
locked_clearRequest(token);
return true; return true;
} }
@ -675,125 +628,126 @@ bool RsGxsDataAccess::getGroupList(const uint32_t& token, std::list<RsGxsGroupId
GxsRequest* req = locked_retrieveRequest(token); GxsRequest* req = locked_retrieveRequest(token);
if(req == NULL){ if(req == NULL)
{
std::cerr << "RsGxsDataAccess::getGroupList() Unable to retrieve group Ids," std::cerr << "RsGxsDataAccess::getGroupList() Unable to retrieve group Ids, Request does not exist" << std::endl;
"\nRequest does not exist" << std::endl;
return false;
}else if(req->status == COMPLETE){
GroupIdReq* gireq = dynamic_cast<GroupIdReq*>(req);
if(gireq)
{
groupIds.swap(gireq->mGroupIdResult);
gireq->mGroupIdResult.clear();
locked_updateRequestStatus(token, DONE);
}else{
std::cerr << "RsGxsDataAccess::getGroupList() Req found, failed caste" << std::endl;
return false;
}
}else{
std::cerr << "RsGxsDataAccess::getGroupList() Req not ready" << std::endl;
return false; return false;
} }
GroupIdReq* gireq = dynamic_cast<GroupIdReq*>(req);
if(gireq)
{
groupIds.swap(gireq->mGroupIdResult);
gireq->mGroupIdResult.clear();
}
else
{
std::cerr << "RsGxsDataAccess::getGroupList() Req found, failed caste" << std::endl;
return false;
}
locked_clearRequest(token);
return true; return true;
} }
bool RsGxsDataAccess::getGroupStatistic(const uint32_t &token, GxsGroupStatistic &grpStatistic)
{
RsStackMutex stack(mDataMutex);
GxsRequest* req = locked_retrieveRequest(token);
if(req == NULL)
{
std::cerr << "RsGxsDataAccess::getGroupStatistic() Unable to retrieve grp stats" << std::endl;
return false;
}
GroupStatisticRequest* gsreq = dynamic_cast<GroupStatisticRequest*>(req);
if(gsreq)
grpStatistic = gsreq->mGroupStatistic;
else
{
std::cerr << "RsGxsDataAccess::getGroupStatistic() Req found, failed caste" << std::endl;
return false;
}
locked_clearRequest(token);
return true;
}
bool RsGxsDataAccess::getServiceStatistic(const uint32_t &token, GxsServiceStatistic &servStatistic)
{
RsStackMutex stack(mDataMutex);
GxsRequest* req = locked_retrieveRequest(token);
if(req == NULL)
{
std::cerr << "RsGxsDataAccess::getServiceStatistic() Unable to retrieve service stats" << std::endl;
return false;
}
ServiceStatisticRequest* ssreq = dynamic_cast<ServiceStatisticRequest*>(req);
if(ssreq)
servStatistic = ssreq->mServiceStatistic;
else
{
std::cerr << "RsGxsDataAccess::getServiceStatistic() Req found, failed caste" << std::endl;
return false;
}
locked_clearRequest(token);
return true;
}
GxsRequest* RsGxsDataAccess::locked_retrieveRequest(const uint32_t& token) GxsRequest* RsGxsDataAccess::locked_retrieveRequest(const uint32_t& token)
{ {
auto it = mCompletedRequests.find(token) ;
if(mRequests.find(token) == mRequests.end()) return NULL; if(it == mCompletedRequests.end())
return NULL;
GxsRequest* req = mRequests[token]; return it->second;
return req;
} }
#define MAX_REQUEST_AGE 120 // 2 minutes #define MAX_REQUEST_AGE 120 // 2 minutes
void RsGxsDataAccess::processRequests() void RsGxsDataAccess::processRequests()
{ {
std::list<uint32_t> toClear;
rstime_t now = time(NULL);
std::map<uint32_t, GxsRequest*>::iterator it;
{
RsStackMutex stack(mDataMutex); /******* LOCKED *******/
// process status of the requests
for (it = mRequests.begin(); it != mRequests.end(); ++it)
{
GxsRequest* req = it->second;
switch (req->status)
{
case PENDING:
// process request later
break;
case PARTIAL:
// should not happen
req->status = COMPLETE;
break;
case DONE:
#ifdef DATA_DEBUG
std::cerr << "RsGxsDataAccess::processrequests() Clearing Done Request Token: " << req->token;
std::cerr << std::endl;
#endif
toClear.push_back(req->token);
break;
case CANCELLED:
#ifdef DATA_DEBUG
std::cerr << "RsGxsDataAccess::processrequests() Clearing Cancelled Request Token: " << req->token;
std::cerr << std::endl;
#endif
toClear.push_back(req->token);
break;
default:
if (now - req->reqTime > MAX_REQUEST_AGE)
{
#ifdef DATA_DEBUG
std::cerr << "RsGxsDataAccess::processrequests() Clearing Old Request Token: " << req->token;
std::cerr << std::endl;
#endif
toClear.push_back(req->token);
}
}
}
} // END OF MUTEX.
// clear requests
std::list<uint32_t>::iterator cit;
for (cit = toClear.begin(); cit != toClear.end(); ++cit)
{
clearRequest(*cit);
}
// We hve to make a copy of the mRequest list because it can be modified while we treat the requests.
// This may happen because the mutex cannot be added around the full loop since it takes too much time.
std::map<uint32_t,GxsRequest*> request_list_copy;
{
RS_STACK_MUTEX(mDataMutex);
request_list_copy = mRequests;
}
// process requests // process requests
while (true)
while (!mRequestQueue.empty())
{ {
GxsRequest* req = NULL; // Extract the first elements from the request queue. cleanup all other elements marked at terminated.
GxsRequest* req = nullptr;
{ {
// get the first pending request RsStackMutex stack(mDataMutex); /******* LOCKED *******/
for (auto it:request_list_copy) rstime_t now = time(NULL); // this is ok while in the loop below
if (it.second->status == PENDING)
{ while(!mRequestQueue.empty() && req == nullptr)
req = it.second; {
if(now > mRequestQueue.begin()->second->reqTime + MAX_REQUEST_AGE)
{
mRequestQueue.erase(mRequestQueue.begin());
continue;
}
switch( mRequestQueue.begin()->second->status )
{
case PARTIAL:
case COMPLETE:
case DONE:
RsErr() << "Found partial/done/complete request in mRequestQueue. This is a bug." << std::endl; // fallthrough
case FAILED:
case CANCELLED:
mRequestQueue.erase(mRequestQueue.begin());
continue;
break;
case PENDING:
req = mRequestQueue.begin()->second;
req->status = PARTIAL; req->status = PARTIAL;
break; break;
} }
}
}
}
if (!req) if (!req)
break; break;
@ -813,7 +767,7 @@ void RsGxsDataAccess::processRequests()
#ifdef DATA_DEBUG #ifdef DATA_DEBUG
std::cerr << "RsGxsDataAccess::processRequests() Processing Token: " << req->token << " Status: " std::cerr << "RsGxsDataAccess::processRequests() Processing Token: " << req->token << " Status: "
<< req->status << " ReqType: " << req->reqType << " Age: " << req->status << " ReqType: " << req->reqType << " Age: "
<< now - req->reqTime << std::endl; << time(NULL) - req->reqTime << std::endl;
#endif #endif
/* PROCESS REQUEST! */ /* PROCESS REQUEST! */
@ -866,77 +820,26 @@ void RsGxsDataAccess::processRequests()
<< req->token << std::endl; << req->token << std::endl;
} }
// We cannot easily remove the request here because the queue may have more elements now and mRequestQueue.begin() is not necessarily the same element.
// but we mark it as COMPLETE/FAILED so that it will be removed in the next loop.
{ {
RsStackMutex stack(mDataMutex); /******* LOCKED *******/ RsStackMutex stack(mDataMutex); /******* LOCKED *******/
if (req->status == PARTIAL)
{
req->status = ok ? COMPLETE : FAILED;
}
} // END OF MUTEX.
}
}
bool RsGxsDataAccess::getGroupStatistic(const uint32_t &token, GxsGroupStatistic &grpStatistic) if(ok)
{ {
RsStackMutex stack(mDataMutex); // When the request is complete, we move it to the complete list, so that the caller can easily retrieve the request data
GxsRequest* req = locked_retrieveRequest(token); req->status = COMPLETE ;
mCompletedRequests[req->token] = req;
if(req == NULL){ }
else
std::cerr << "RsGxsDataAccess::getGroupStatistic() Unable to retrieve grp stats" << std::endl; req->status = FAILED;
return false;
}else if(req->status == COMPLETE){
GroupStatisticRequest* gsreq = dynamic_cast<GroupStatisticRequest*>(req);
if(gsreq)
{
grpStatistic = gsreq->mGroupStatistic;
locked_updateRequestStatus(token, DONE);
}
else{
std::cerr << "RsGxsDataAccess::getGroupStatistic() Req found, failed caste" << std::endl;
return false;
} }
}else{
std::cerr << "RsGxsDataAccess::getGroupStatistic() Req not ready" << std::endl;
return false;
}
return true; } // END OF MUTEX.
} }
bool RsGxsDataAccess::getServiceStatistic(const uint32_t &token, GxsServiceStatistic &servStatistic)
{
RsStackMutex stack(mDataMutex);
GxsRequest* req = locked_retrieveRequest(token);
if(req == NULL){
std::cerr << "RsGxsDataAccess::getServiceStatistic() Unable to retrieve service stats" << std::endl;
return false;
}else if(req->status == COMPLETE){
ServiceStatisticRequest* ssreq = dynamic_cast<ServiceStatisticRequest*>(req);
if(ssreq)
{
servStatistic = ssreq->mServiceStatistic;
locked_updateRequestStatus(token, DONE);
}
else{
std::cerr << "RsGxsDataAccess::getServiceStatistic() Req found, failed caste" << std::endl;
return false;
}
}else{
std::cerr << "RsGxsDataAccess::getServiceStatistic() Req not ready" << std::endl;
return false;
}
return true;
}
bool RsGxsDataAccess::getGroupSerializedData(GroupSerializedDataReq* req) bool RsGxsDataAccess::getGroupSerializedData(GroupSerializedDataReq* req)
{ {
@ -1847,16 +1750,14 @@ void RsGxsDataAccess::tokenList(std::list<uint32_t>& tokens)
RsStackMutex stack(mDataMutex); RsStackMutex stack(mDataMutex);
std::map<uint32_t, GxsRequest*>::iterator mit = mRequests.begin(); for(auto& it:mRequestQueue)
tokens.push_back(it.second->token);
for(; mit != mRequests.end(); ++mit) for(auto& it:mCompletedRequests)
{ tokens.push_back(it.first);
tokens.push_back(mit->first);
}
} }
bool RsGxsDataAccess::locked_updateRequestStatus( bool RsGxsDataAccess::locked_updateRequestStatus( uint32_t token, RsTokenService::GxsRequestStatus status )
uint32_t token, RsTokenService::GxsRequestStatus status )
{ {
GxsRequest* req = locked_retrieveRequest(token); GxsRequest* req = locked_retrieveRequest(token);

View File

@ -22,6 +22,7 @@
#ifndef RSGXSDATAACCESS_H #ifndef RSGXSDATAACCESS_H
#define RSGXSDATAACCESS_H #define RSGXSDATAACCESS_H
#include <queue>
#include "retroshare/rstokenservice.h" #include "retroshare/rstokenservice.h"
#include "rsgxsrequesttypes.h" #include "rsgxsrequesttypes.h"
#include "rsgds.h" #include "rsgds.h"
@ -30,6 +31,8 @@
typedef std::map< RsGxsGroupId, std::map<RsGxsMessageId, RsGxsMsgMetaData*> > MsgMetaFilter; typedef std::map< RsGxsGroupId, std::map<RsGxsMessageId, RsGxsMsgMetaData*> > MsgMetaFilter;
typedef std::map< RsGxsGroupId, RsGxsGrpMetaData* > GrpMetaFilter; typedef std::map< RsGxsGroupId, RsGxsGrpMetaData* > GrpMetaFilter;
bool operator<(const std::pair<uint32_t,GxsRequest*>& p1,const std::pair<uint32_t,GxsRequest*>& p2);
class RsGxsDataAccess : public RsTokenService class RsGxsDataAccess : public RsTokenService
{ {
public: public:
@ -485,6 +488,7 @@ private:
bool getMsgList(const GxsMsgReq& msgIds, const RsTokReqOptions& opts, GxsMsgReq& msgIdsOut); bool getMsgList(const GxsMsgReq& msgIds, const RsTokReqOptions& opts, GxsMsgReq& msgIdsOut);
private: private:
bool locked_clearRequest(const uint32_t &token);
RsGeneralDataService* mDataStore; RsGeneralDataService* mDataStore;
@ -492,10 +496,9 @@ private:
uint32_t mNextToken; uint32_t mNextToken;
std::map<uint32_t, GxsRequestStatus> mPublicToken; std::map<uint32_t, GxsRequestStatus> mPublicToken;
std::map<uint32_t, GxsRequest*> mRequests;
std::set<std::pair<uint32_t,GxsRequest*> > mRequestQueue;
std::map<uint32_t, GxsRequest*> mCompletedRequests;
}; };
#endif // RSGXSDATAACCESS_H #endif // RSGXSDATAACCESS_H

View File

@ -26,6 +26,14 @@
#include "gxs/rsgds.h" #include "gxs/rsgds.h"
#include "util/rsdeprecate.h" #include "util/rsdeprecate.h"
enum class GxsRequestPriority {
VERY_HIGH = 0x00,
HIGH = 0x01,
NORMAL = 0x02,
LOW = 0x03,
VERY_LOW = 0x04,
};
struct GxsRequest struct GxsRequest
{ {
GxsRequest() : GxsRequest() :
@ -38,6 +46,7 @@ struct GxsRequest
RS_DEPRECATED uint32_t ansType; /// G10h4ck: This is of no use RS_DEPRECATED uint32_t ansType; /// G10h4ck: This is of no use
uint32_t reqType; uint32_t reqType;
GxsRequestPriority priority;
RsTokReqOptions Options; RsTokReqOptions Options;
RsTokenService::GxsRequestStatus status; RsTokenService::GxsRequestStatus status;