From 2d23a9f251675e0864a31a3f45a49e17b2f89ea5 Mon Sep 17 00:00:00 2001 From: csoler Date: Sun, 5 Apr 2020 22:59:58 +0200 Subject: [PATCH] experimental implementation of TokenQueue priority. not fully functional yet --- libretroshare/src/gxs/rsgxsdataaccess.cc | 631 +++++++++------------- libretroshare/src/gxs/rsgxsdataaccess.h | 9 +- libretroshare/src/gxs/rsgxsrequesttypes.h | 9 + 3 files changed, 281 insertions(+), 368 deletions(-) diff --git a/libretroshare/src/gxs/rsgxsdataaccess.cc b/libretroshare/src/gxs/rsgxsdataaccess.cc index 15fbd80ae..4a9f32eb6 100644 --- a/libretroshare/src/gxs/rsgxsdataaccess.cc +++ b/libretroshare/src/gxs/rsgxsdataaccess.cc @@ -32,14 +32,20 @@ #define DATA_DEBUG 1 +bool operator<(const std::pair& p1,const std::pair& p2) +{ + return p1.second->priority <= p2.second->priority ; // <= so that new elements with same priority are inserted before +} + + RsGxsDataAccess::RsGxsDataAccess(RsGeneralDataService* ds) : mDataStore(ds), mDataMutex("RsGxsDataAccess"), mNextToken(0) {} RsGxsDataAccess::~RsGxsDataAccess() { - for(std::map::const_iterator it(mRequests.begin());it!=mRequests.end();++it) - delete it->second ; + for(auto& it:mRequestQueue) + delete it.second; } bool RsGxsDataAccess::requestGroupInfo( uint32_t &token, uint32_t ansType, const RsTokReqOptions &opts, @@ -309,7 +315,8 @@ void RsGxsDataAccess::storeRequest(GxsRequest* req) RS_STACK_MUTEX(mDataMutex); req->status = PENDING; req->reqTime = time(NULL); - mRequests[req->token] = req; + + mRequestQueue.insert(std::make_pair(req->token,req)); } RsTokenService::GxsRequestStatus RsGxsDataAccess::requestStatus(uint32_t token) @@ -350,25 +357,25 @@ bool RsGxsDataAccess::cancelRequest(const uint32_t& token) bool RsGxsDataAccess::clearRequest(const uint32_t& token) { - RsStackMutex stack(mDataMutex); /****** LOCKED *****/ + RS_STACK_MUTEX(mDataMutex); + return locked_clearRequest(token); +} - std::map::iterator it; +bool RsGxsDataAccess::locked_clearRequest(const uint32_t& token) +{ + auto it = mCompletedRequests.find(token); - it = mRequests.find(token); - if (it == mRequests.end()) - { - return false; - } + if(it == mCompletedRequests.end()) + return false; - delete it->second; - mRequests.erase(it); + delete it->second; + mCompletedRequests.erase(it); - return true; + return true; } bool RsGxsDataAccess::getGroupSummary(const uint32_t& token, std::list& groupInfo) { - RS_STACK_MUTEX(mDataMutex); GxsRequest* req = locked_retrieveRequest(token); @@ -379,29 +386,21 @@ bool RsGxsDataAccess::getGroupSummary(const uint32_t& token, std::liststatus == COMPLETE) - { - GroupMetaReq* gmreq = dynamic_cast(req); - if(gmreq) - { - groupInfo = gmreq->mGroupMetaData; - gmreq->mGroupMetaData.clear(); - locked_updateRequestStatus(token, DONE); - } - else - { - std::cerr << "RsGxsDataAccess::getGroupSummary() Req found, failed" - << "cast" << std::endl; - return false; - } + GroupMetaReq* gmreq = dynamic_cast(req); + + if(gmreq) + { + groupInfo = gmreq->mGroupMetaData; + gmreq->mGroupMetaData.clear(); } else { - std::cerr << "RsGxsDataAccess::getGroupSummary() Req not ready" - << std::endl; + std::cerr << "RsGxsDataAccess::getGroupSummary() Req found, failed" + << "cast" << std::endl; return false; } + locked_clearRequest(token); return true; } @@ -418,36 +417,27 @@ bool RsGxsDataAccess::getGroupData(const uint32_t& token, std::list& << "data" << std::endl; return false; } - else if(req->status == COMPLETE) + + GroupDataReq* gmreq = dynamic_cast(req); + GroupSerializedDataReq* gsreq = dynamic_cast(req); + + if(gsreq) { - GroupDataReq* gmreq = dynamic_cast(req); - GroupSerializedDataReq* gsreq = dynamic_cast(req); - - if(gsreq) - { - grpData.swap(gsreq->mGroupData); - gsreq->mGroupData.clear(); - - locked_updateRequestStatus(token, DONE); - } - else if(gmreq) - { - grpData.swap(gmreq->mGroupData); - gmreq->mGroupData.clear(); - locked_updateRequestStatus(token, DONE); - } - else - { - std::cerr << "RsGxsDataAccess::getGroupData() Req found, failed cast" - << " req->reqType: " << req->reqType << std::endl; - return false; - } + grpData.swap(gsreq->mGroupData); + gsreq->mGroupData.clear(); + } + else if(gmreq) + { + grpData.swap(gmreq->mGroupData); + gmreq->mGroupData.clear(); } else { - std::cerr << "RsGxsDataAccess::getGroupData() Req not ready" << std::endl; + std::cerr << "RsGxsDataAccess::getGroupData() Req found, failed cast" + << " req->reqType: " << req->reqType << std::endl; return false; } + locked_clearRequest(token); return true; } @@ -459,29 +449,26 @@ bool RsGxsDataAccess::getMsgData(const uint32_t& token, NxsMsgDataResult& msgDat GxsRequest* req = locked_retrieveRequest(token); - if(req == NULL){ + if(req == NULL) + { std::cerr << "RsGxsDataAccess::getMsgData() Unable to retrieve group data" << std::endl; return false; - }else if(req->status == COMPLETE){ + } - MsgDataReq* mdreq = dynamic_cast(req); + MsgDataReq* mdreq = dynamic_cast(req); - if(mdreq) - { - msgData.swap(mdreq->mMsgData); - mdreq->mMsgData.clear(); - locked_updateRequestStatus(token, DONE); - } - else - { - std::cerr << "RsGxsDataAccess::getMsgData() Req found, failed caste" << std::endl; - return false; - } - }else{ - std::cerr << "RsGxsDataAccess::getMsgData() Req not ready" << std::endl; + if(mdreq) + { + msgData.swap(mdreq->mMsgData); + mdreq->mMsgData.clear(); + } + else + { + std::cerr << "RsGxsDataAccess::getMsgData() Req found, failed caste" << std::endl; return false; } + locked_clearRequest(token); return true; } @@ -493,32 +480,28 @@ bool RsGxsDataAccess::getMsgRelatedData(const uint32_t &token, NxsMsgRelatedData GxsRequest* req = locked_retrieveRequest(token); - if(req == NULL){ + if(req == NULL) + { + std::cerr << "RsGxsDataAccess::getMsgRelatedData() Unable to retrieve group data" << std::endl; + return false; + } + MsgRelatedInfoReq* mrireq = dynamic_cast(req); - std::cerr << "RsGxsDataAccess::getMsgRelatedData() Unable to retrieve group data" << std::endl; - return false; - }else if(req->status == COMPLETE){ + if(req->Options.mReqType != GXS_REQUEST_TYPE_MSG_RELATED_DATA) + return false; - MsgRelatedInfoReq* mrireq = dynamic_cast(req); + if(mrireq) + { + msgData.swap(mrireq->mMsgDataResult); + mrireq->mMsgDataResult.clear(); + } + else + { + std::cerr << "RsGxsDataAccess::getMsgRelatedData() Req found, failed caste" << std::endl; + return false; + } - if(req->Options.mReqType != GXS_REQUEST_TYPE_MSG_RELATED_DATA) - return false; - - if(mrireq) - { - msgData.swap(mrireq->mMsgDataResult); - mrireq->mMsgDataResult.clear(); - locked_updateRequestStatus(token, DONE); - } - else - { - std::cerr << "RsGxsDataAccess::getMsgRelatedData() Req found, failed caste" << std::endl; - return false; - } - }else{ - std::cerr << "RsGxsDataAccess::getMsgRelatedData() Req not ready" << std::endl; - return false; - } + locked_clearRequest(token); return true; } @@ -530,111 +513,86 @@ bool RsGxsDataAccess::getMsgSummary(const uint32_t& token, GxsMsgMetaResult& msg GxsRequest* req = locked_retrieveRequest(token); - if(req == NULL){ - + if(req == NULL) + { std::cerr << "RsGxsDataAccess::getMsgSummary() Unable to retrieve group data" << std::endl; return false; - }else if(req->status == COMPLETE){ + } + MsgMetaReq* mmreq = dynamic_cast(req); - MsgMetaReq* mmreq = dynamic_cast(req); - - if(mmreq) - { - msgInfo.swap(mmreq->mMsgMetaData); - mmreq->mMsgMetaData.clear(); - locked_updateRequestStatus(token, DONE); - - } - else - { - std::cerr << "RsGxsDataAccess::getMsgSummary() Req found, failed caste" << std::endl; - return false; - } - }else{ - std::cerr << "RsGxsDataAccess::getMsgSummary() Req not ready" << std::endl; + if(mmreq) + { + msgInfo.swap(mmreq->mMsgMetaData); + mmreq->mMsgMetaData.clear(); + } + else + { + std::cerr << "RsGxsDataAccess::getMsgSummary() Req found, failed caste" << std::endl; return false; } - + locked_clearRequest(token); return true; } bool RsGxsDataAccess::getMsgRelatedSummary(const uint32_t &token, MsgRelatedMetaResult &msgMeta) { + RsStackMutex stack(mDataMutex); - RsStackMutex stack(mDataMutex); + GxsRequest* req = locked_retrieveRequest(token); - GxsRequest* req = locked_retrieveRequest(token); + if(req == NULL) + { + std::cerr << "RsGxsDataAccess::getMsgRelatedSummary() Unable to retrieve message summary" << std::endl; + return false; + } + if(req->Options.mReqType != GXS_REQUEST_TYPE_MSG_RELATED_META) + return false; + MsgRelatedInfoReq* mrireq = dynamic_cast(req); - - if(req == NULL){ - - std::cerr << "RsGxsDataAccess::getMsgRelatedSummary() Unable to retrieve message summary" << std::endl; - return false; - }else if(req->status == COMPLETE){ - - if(req->Options.mReqType != GXS_REQUEST_TYPE_MSG_RELATED_META) - return false; - - MsgRelatedInfoReq* mrireq = dynamic_cast(req); - - if(mrireq) - { - msgMeta.swap(mrireq->mMsgMetaResult); - mrireq->mMsgMetaResult.clear(); - locked_updateRequestStatus(token, DONE); - } - else - { - std::cerr << "RsGxsDataAccess::getMsgRelatedSummary() Req found, failed caste" << std::endl; - return false; - } - } - else - { - std::cerr << "RsGxsDataAccess::getMsgRelatedSummary() Req not ready" << std::endl; - return false; - } - - return true; + if(mrireq) + { + msgMeta.swap(mrireq->mMsgMetaResult); + mrireq->mMsgMetaResult.clear(); + } + else + { + std::cerr << "RsGxsDataAccess::getMsgRelatedSummary() Req found, failed caste" << std::endl; + return false; + } + locked_clearRequest(token); + return true; } bool RsGxsDataAccess::getMsgRelatedList(const uint32_t &token, MsgRelatedIdResult &msgIds) { - RsStackMutex stack(mDataMutex); + RsStackMutex stack(mDataMutex); - GxsRequest* req = locked_retrieveRequest(token); + GxsRequest* req = locked_retrieveRequest(token); - if(req == NULL){ + if(req == NULL) + { + std::cerr << "RsGxsDataAccess::getMsgRelatedList() Unable to retrieve message data" << std::endl; + return false; + } + if(req->Options.mReqType != GXS_REQUEST_TYPE_MSG_RELATED_IDS) + return false; - std::cerr << "RsGxsDataAccess::getMsgRelatedList() Unable to retrieve message data" << std::endl; - return false; - }else if(req->status == COMPLETE){ + MsgRelatedInfoReq* mrireq = dynamic_cast(req); - if(req->Options.mReqType != GXS_REQUEST_TYPE_MSG_RELATED_IDS) - return false; - - MsgRelatedInfoReq* mrireq = dynamic_cast(req); - - if(mrireq) - { - msgIds.swap(mrireq->mMsgIdResult); - mrireq->mMsgIdResult.clear(); - locked_updateRequestStatus(token, DONE); - } - else{ - std::cerr << "RsGxsDataAccess::getMsgRelatedList() Req found, failed caste" << std::endl; - return false; - } - } - else - { - std::cerr << "RsGxsDataAccess::getMsgRelatedList() Req not ready" << std::endl; - return false; - } - - return true; + if(mrireq) + { + msgIds.swap(mrireq->mMsgIdResult); + mrireq->mMsgIdResult.clear(); + } + else + { + std::cerr << "RsGxsDataAccess::getMsgRelatedList() Req found, failed caste" << std::endl; + return false; + } + locked_clearRequest(token); + return true; } bool RsGxsDataAccess::getMsgList(const uint32_t& token, GxsMsgIdResult& msgIds) @@ -643,29 +601,24 @@ bool RsGxsDataAccess::getMsgList(const uint32_t& token, GxsMsgIdResult& msgIds) GxsRequest* req = locked_retrieveRequest(token); - if(req == NULL){ - - std::cerr << "RsGxsDataAccess::getMsgList() Unable to retrieve msg Ids" << std::endl; - return false; - }else if(req->status == COMPLETE){ - - MsgIdReq* mireq = dynamic_cast(req); - - if(mireq) - { - msgIds.swap(mireq->mMsgIdResult); - mireq->mMsgIdResult.clear(); - locked_updateRequestStatus(token, DONE); - } - else{ - std::cerr << "RsGxsDataAccess::getMsgList() Req found, failed caste" << std::endl; - return false; - } - }else{ - std::cerr << "RsGxsDataAccess::getMsgList() Req not ready" << std::endl; + if(req == NULL) + { + std::cerr << "RsGxsDataAccess::getMsgList() Unable to retrieve msg Ids" << std::endl; return false; } + MsgIdReq* mireq = dynamic_cast(req); + if(mireq) + { + msgIds.swap(mireq->mMsgIdResult); + mireq->mMsgIdResult.clear(); + } + else + { + std::cerr << "RsGxsDataAccess::getMsgList() Req found, failed caste" << std::endl; + return false; + } + locked_clearRequest(token); return true; } @@ -675,125 +628,126 @@ bool RsGxsDataAccess::getGroupList(const uint32_t& token, std::liststatus == COMPLETE){ - - GroupIdReq* gireq = dynamic_cast(req); - - if(gireq) - { - groupIds.swap(gireq->mGroupIdResult); - gireq->mGroupIdResult.clear(); - locked_updateRequestStatus(token, DONE); - - }else{ - std::cerr << "RsGxsDataAccess::getGroupList() Req found, failed caste" << std::endl; - return false; - } - }else{ - std::cerr << "RsGxsDataAccess::getGroupList() Req not ready" << std::endl; + if(req == NULL) + { + std::cerr << "RsGxsDataAccess::getGroupList() Unable to retrieve group Ids, Request does not exist" << std::endl; return false; } + GroupIdReq* gireq = dynamic_cast(req); + if(gireq) + { + groupIds.swap(gireq->mGroupIdResult); + gireq->mGroupIdResult.clear(); + } + else + { + std::cerr << "RsGxsDataAccess::getGroupList() Req found, failed caste" << std::endl; + return false; + } + locked_clearRequest(token); return true; } +bool RsGxsDataAccess::getGroupStatistic(const uint32_t &token, GxsGroupStatistic &grpStatistic) +{ + RsStackMutex stack(mDataMutex); + + GxsRequest* req = locked_retrieveRequest(token); + + if(req == NULL) + { + std::cerr << "RsGxsDataAccess::getGroupStatistic() Unable to retrieve grp stats" << std::endl; + return false; + } + GroupStatisticRequest* gsreq = dynamic_cast(req); + + if(gsreq) + grpStatistic = gsreq->mGroupStatistic; + else + { + std::cerr << "RsGxsDataAccess::getGroupStatistic() Req found, failed caste" << std::endl; + return false; + } + locked_clearRequest(token); + return true; +} + +bool RsGxsDataAccess::getServiceStatistic(const uint32_t &token, GxsServiceStatistic &servStatistic) +{ + RsStackMutex stack(mDataMutex); + + GxsRequest* req = locked_retrieveRequest(token); + + if(req == NULL) + { + std::cerr << "RsGxsDataAccess::getServiceStatistic() Unable to retrieve service stats" << std::endl; + return false; + } + ServiceStatisticRequest* ssreq = dynamic_cast(req); + + if(ssreq) + servStatistic = ssreq->mServiceStatistic; + else + { + std::cerr << "RsGxsDataAccess::getServiceStatistic() Req found, failed caste" << std::endl; + return false; + } + locked_clearRequest(token); + return true; +} GxsRequest* RsGxsDataAccess::locked_retrieveRequest(const uint32_t& token) { + auto it = mCompletedRequests.find(token) ; - if(mRequests.find(token) == mRequests.end()) return NULL; + if(it == mCompletedRequests.end()) + return NULL; - GxsRequest* req = mRequests[token]; - - return req; + return it->second; } #define MAX_REQUEST_AGE 120 // 2 minutes void RsGxsDataAccess::processRequests() { - std::list toClear; - rstime_t now = time(NULL); - std::map::iterator it; - - { - RsStackMutex stack(mDataMutex); /******* LOCKED *******/ - - // process status of the requests - for (it = mRequests.begin(); it != mRequests.end(); ++it) - { - GxsRequest* req = it->second; - - switch (req->status) - { - case PENDING: - // process request later - break; - case PARTIAL: - // should not happen - req->status = COMPLETE; - break; - case DONE: -#ifdef DATA_DEBUG - std::cerr << "RsGxsDataAccess::processrequests() Clearing Done Request Token: " << req->token; - std::cerr << std::endl; -#endif - toClear.push_back(req->token); - break; - case CANCELLED: -#ifdef DATA_DEBUG - std::cerr << "RsGxsDataAccess::processrequests() Clearing Cancelled Request Token: " << req->token; - std::cerr << std::endl; -#endif - toClear.push_back(req->token); - break; - default: - if (now - req->reqTime > MAX_REQUEST_AGE) - { -#ifdef DATA_DEBUG - std::cerr << "RsGxsDataAccess::processrequests() Clearing Old Request Token: " << req->token; - std::cerr << std::endl; -#endif - toClear.push_back(req->token); - } - } - } - } // END OF MUTEX. - - // clear requests - std::list::iterator cit; - for (cit = toClear.begin(); cit != toClear.end(); ++cit) - { - clearRequest(*cit); - } - - // We hve to make a copy of the mRequest list because it can be modified while we treat the requests. - // This may happen because the mutex cannot be added around the full loop since it takes too much time. - - std::map request_list_copy; - - { - RS_STACK_MUTEX(mDataMutex); - request_list_copy = mRequests; - } // process requests - while (true) + + while (!mRequestQueue.empty()) { - GxsRequest* req = NULL; + // Extract the first elements from the request queue. cleanup all other elements marked at terminated. + + GxsRequest* req = nullptr; { - // get the first pending request - for (auto it:request_list_copy) - if (it.second->status == PENDING) - { - req = it.second; + RsStackMutex stack(mDataMutex); /******* LOCKED *******/ + rstime_t now = time(NULL); // this is ok while in the loop below + + while(!mRequestQueue.empty() && req == nullptr) + { + if(now > mRequestQueue.begin()->second->reqTime + MAX_REQUEST_AGE) + { + mRequestQueue.erase(mRequestQueue.begin()); + continue; + } + + switch( mRequestQueue.begin()->second->status ) + { + case PARTIAL: + case COMPLETE: + case DONE: + RsErr() << "Found partial/done/complete request in mRequestQueue. This is a bug." << std::endl; // fallthrough + case FAILED: + case CANCELLED: + mRequestQueue.erase(mRequestQueue.begin()); + continue; + break; + case PENDING: + req = mRequestQueue.begin()->second; req->status = PARTIAL; - break; - } - } + break; + } + + } + } if (!req) break; @@ -813,7 +767,7 @@ void RsGxsDataAccess::processRequests() #ifdef DATA_DEBUG std::cerr << "RsGxsDataAccess::processRequests() Processing Token: " << req->token << " Status: " << req->status << " ReqType: " << req->reqType << " Age: " - << now - req->reqTime << std::endl; + << time(NULL) - req->reqTime << std::endl; #endif /* PROCESS REQUEST! */ @@ -866,77 +820,26 @@ void RsGxsDataAccess::processRequests() << req->token << std::endl; } + // We cannot easily remove the request here because the queue may have more elements now and mRequestQueue.begin() is not necessarily the same element. + // but we mark it as COMPLETE/FAILED so that it will be removed in the next loop. { RsStackMutex stack(mDataMutex); /******* LOCKED *******/ - if (req->status == PARTIAL) - { - req->status = ok ? COMPLETE : FAILED; - } - } // END OF MUTEX. - } -} -bool RsGxsDataAccess::getGroupStatistic(const uint32_t &token, GxsGroupStatistic &grpStatistic) -{ - RsStackMutex stack(mDataMutex); + if(ok) + { + // When the request is complete, we move it to the complete list, so that the caller can easily retrieve the request data - GxsRequest* req = locked_retrieveRequest(token); - - if(req == NULL){ - - std::cerr << "RsGxsDataAccess::getGroupStatistic() Unable to retrieve grp stats" << std::endl; - return false; - }else if(req->status == COMPLETE){ - - GroupStatisticRequest* gsreq = dynamic_cast(req); - - if(gsreq) - { - grpStatistic = gsreq->mGroupStatistic; - locked_updateRequestStatus(token, DONE); - } - else{ - std::cerr << "RsGxsDataAccess::getGroupStatistic() Req found, failed caste" << std::endl; - return false; + req->status = COMPLETE ; + mCompletedRequests[req->token] = req; + } + else + req->status = FAILED; } - }else{ - std::cerr << "RsGxsDataAccess::getGroupStatistic() Req not ready" << std::endl; - return false; - } - return true; + } // END OF MUTEX. } -bool RsGxsDataAccess::getServiceStatistic(const uint32_t &token, GxsServiceStatistic &servStatistic) -{ - RsStackMutex stack(mDataMutex); - GxsRequest* req = locked_retrieveRequest(token); - - if(req == NULL){ - - std::cerr << "RsGxsDataAccess::getServiceStatistic() Unable to retrieve service stats" << std::endl; - return false; - }else if(req->status == COMPLETE){ - - ServiceStatisticRequest* ssreq = dynamic_cast(req); - - if(ssreq) - { - servStatistic = ssreq->mServiceStatistic; - locked_updateRequestStatus(token, DONE); - } - else{ - std::cerr << "RsGxsDataAccess::getServiceStatistic() Req found, failed caste" << std::endl; - return false; - } - }else{ - std::cerr << "RsGxsDataAccess::getServiceStatistic() Req not ready" << std::endl; - return false; - } - - return true; -} bool RsGxsDataAccess::getGroupSerializedData(GroupSerializedDataReq* req) { @@ -1847,16 +1750,14 @@ void RsGxsDataAccess::tokenList(std::list& tokens) RsStackMutex stack(mDataMutex); - std::map::iterator mit = mRequests.begin(); + for(auto& it:mRequestQueue) + tokens.push_back(it.second->token); - for(; mit != mRequests.end(); ++mit) - { - tokens.push_back(mit->first); - } + for(auto& it:mCompletedRequests) + tokens.push_back(it.first); } -bool RsGxsDataAccess::locked_updateRequestStatus( - uint32_t token, RsTokenService::GxsRequestStatus status ) +bool RsGxsDataAccess::locked_updateRequestStatus( uint32_t token, RsTokenService::GxsRequestStatus status ) { GxsRequest* req = locked_retrieveRequest(token); diff --git a/libretroshare/src/gxs/rsgxsdataaccess.h b/libretroshare/src/gxs/rsgxsdataaccess.h index c5bfdf084..a8ba7f26d 100644 --- a/libretroshare/src/gxs/rsgxsdataaccess.h +++ b/libretroshare/src/gxs/rsgxsdataaccess.h @@ -22,6 +22,7 @@ #ifndef RSGXSDATAACCESS_H #define RSGXSDATAACCESS_H +#include #include "retroshare/rstokenservice.h" #include "rsgxsrequesttypes.h" #include "rsgds.h" @@ -30,6 +31,8 @@ typedef std::map< RsGxsGroupId, std::map > MsgMetaFilter; typedef std::map< RsGxsGroupId, RsGxsGrpMetaData* > GrpMetaFilter; +bool operator<(const std::pair& p1,const std::pair& p2); + class RsGxsDataAccess : public RsTokenService { public: @@ -485,6 +488,7 @@ private: bool getMsgList(const GxsMsgReq& msgIds, const RsTokReqOptions& opts, GxsMsgReq& msgIdsOut); private: + bool locked_clearRequest(const uint32_t &token); RsGeneralDataService* mDataStore; @@ -492,10 +496,9 @@ private: uint32_t mNextToken; std::map mPublicToken; - std::map mRequests; - - + std::set > mRequestQueue; + std::map mCompletedRequests; }; #endif // RSGXSDATAACCESS_H diff --git a/libretroshare/src/gxs/rsgxsrequesttypes.h b/libretroshare/src/gxs/rsgxsrequesttypes.h index 7af50b135..acef43eaa 100644 --- a/libretroshare/src/gxs/rsgxsrequesttypes.h +++ b/libretroshare/src/gxs/rsgxsrequesttypes.h @@ -26,6 +26,14 @@ #include "gxs/rsgds.h" #include "util/rsdeprecate.h" +enum class GxsRequestPriority { + VERY_HIGH = 0x00, + HIGH = 0x01, + NORMAL = 0x02, + LOW = 0x03, + VERY_LOW = 0x04, +}; + struct GxsRequest { GxsRequest() : @@ -38,6 +46,7 @@ struct GxsRequest RS_DEPRECATED uint32_t ansType; /// G10h4ck: This is of no use uint32_t reqType; + GxsRequestPriority priority; RsTokReqOptions Options; RsTokenService::GxsRequestStatus status;