diff --git a/libretroshare/src/gxs/rsdataservice.cc b/libretroshare/src/gxs/rsdataservice.cc index cc215dc14..6bdc924ed 100644 --- a/libretroshare/src/gxs/rsdataservice.cc +++ b/libretroshare/src/gxs/rsdataservice.cc @@ -26,10 +26,6 @@ * #define RS_DATA_SERVICE_DEBUG_CACHE 1 ****/ -#define RS_DATA_SERVICE_DEBUG 1 -#define RS_DATA_SERVICE_DEBUG_TIME 1 -#define RS_DATA_SERVICE_DEBUG_CACHE 1 - #include #include #include diff --git a/libretroshare/src/gxs/rsgenexchange.cc b/libretroshare/src/gxs/rsgenexchange.cc index c7c274fa4..af4a02fcd 100644 --- a/libretroshare/src/gxs/rsgenexchange.cc +++ b/libretroshare/src/gxs/rsgenexchange.cc @@ -1193,7 +1193,7 @@ bool RsGenExchange::getGroupList(const uint32_t &token, std::list bool RsGenExchange::getMsgList(const uint32_t &token, GxsMsgIdResult &msgIds) { - return mDataAccess->getMsgList(token, msgIds); + return mDataAccess->getMsgIdList(token, msgIds); } bool RsGenExchange::getMsgRelatedList(const uint32_t &token, MsgRelatedIdResult &msgIds) @@ -1692,7 +1692,7 @@ void RsGenExchange::notifyChangedGroupStats(const RsGxsGroupId &grpId) { RS_STACK_MUTEX(mGenMtx); - RsGxsGroupChange* gc = new RsGxsGroupChange(RsGxsNotify::TYPE_PROCESSED, false); + RsGxsGroupChange* gc = new RsGxsGroupChange(RsGxsNotify::TYPE_STATISTICS_CHANGED, false); gc->mGrpIdList.push_back(grpId); mNotifications.push_back(gc); } diff --git a/libretroshare/src/gxs/rsgxsdataaccess.cc b/libretroshare/src/gxs/rsgxsdataaccess.cc index f8300b37e..e5f26dbb3 100644 --- a/libretroshare/src/gxs/rsgxsdataaccess.cc +++ b/libretroshare/src/gxs/rsgxsdataaccess.cc @@ -30,7 +30,11 @@ * #define DATA_DEBUG 1 **********/ -#define DATA_DEBUG 1 +bool operator<(const std::pair& p1,const std::pair& p2) +{ + return p1.second->Options.mPriority <= p2.second->Options.mPriority ; // <= so that new elements with same priority are inserted before +} + RsGxsDataAccess::RsGxsDataAccess(RsGeneralDataService* ds) : mDataStore(ds), mDataMutex("RsGxsDataAccess"), mNextToken(0) {} @@ -38,21 +42,18 @@ RsGxsDataAccess::RsGxsDataAccess(RsGeneralDataService* ds) : RsGxsDataAccess::~RsGxsDataAccess() { - for(std::map::const_iterator it(mRequests.begin());it!=mRequests.end();++it) - delete it->second ; + for(auto& it:mRequestQueue) + delete it.second; } -bool RsGxsDataAccess::requestGroupInfo( - uint32_t &token, uint32_t ansType, const RsTokReqOptions &opts, - const std::list &groupIds ) +bool RsGxsDataAccess::requestGroupInfo( uint32_t &token, uint32_t ansType, const RsTokReqOptions &opts, const std::list &groupIds ) { if(groupIds.empty()) { - std::cerr << __PRETTY_FUNCTION__ << " (WW) Group Id list is empty!" - << std::endl; + RsErr() << __PRETTY_FUNCTION__ << " (WW) Group Id list is empty!" << std::endl; return false; } - GxsRequest* req = NULL; + GxsRequest* req = nullptr; uint32_t reqType = opts.mReqType; if(reqType & GXS_REQUEST_TYPE_GROUP_META) @@ -82,16 +83,14 @@ bool RsGxsDataAccess::requestGroupInfo( if(!req) { - std::cerr << __PRETTY_FUNCTION__ << " request type not recognised, " - << "reqType: " << reqType << std::endl; + RsErr() << __PRETTY_FUNCTION__ << " request type not recognised, " << "reqType: " << reqType << std::endl; return false; } generateToken(token); #ifdef DATA_DEBUG - std::cerr << "RsGxsDataAccess::requestGroupInfo() gets token: " << token - << std::endl; + RsErr() << "RsGxsDataAccess::requestGroupInfo() gets token: " << token << std::endl; #endif setReq(req, token, ansType, opts); @@ -103,7 +102,7 @@ bool RsGxsDataAccess::requestGroupInfo( bool RsGxsDataAccess::requestGroupInfo(uint32_t &token, uint32_t ansType, const RsTokReqOptions &opts) { - GxsRequest* req = NULL; + GxsRequest* req = nullptr; uint32_t reqType = opts.mReqType; if(reqType & GXS_REQUEST_TYPE_GROUP_META) @@ -116,14 +115,13 @@ bool RsGxsDataAccess::requestGroupInfo(uint32_t &token, uint32_t ansType, const req = new GroupSerializedDataReq(); else { - std::cerr << "RsGxsDataAccess::requestGroupInfo() request type not recognised, type " - << reqType << std::endl; + RsErr() << "RsGxsDataAccess::requestGroupInfo() request type not recognised, type " << reqType << std::endl; return false; } generateToken(token); #ifdef DATA_DEBUG - std::cerr << "RsGxsDataAccess::requestGroupInfo() gets Token: " << token << std::endl; + RsErr() << "RsGxsDataAccess::requestGroupInfo() gets Token: " << token << std::endl; #endif setReq(req, token, ansType, opts); @@ -139,11 +137,10 @@ void RsGxsDataAccess::generateToken(uint32_t &token) } -bool RsGxsDataAccess::requestMsgInfo(uint32_t &token, uint32_t ansType, - const RsTokReqOptions &opts, const GxsMsgReq &msgIds) +bool RsGxsDataAccess::requestMsgInfo(uint32_t &token, uint32_t ansType, const RsTokReqOptions &opts, const GxsMsgReq &msgIds) { - GxsRequest* req = NULL; + GxsRequest* req = nullptr; uint32_t reqType = opts.mReqType; // remove all empty grpId entries @@ -183,16 +180,15 @@ bool RsGxsDataAccess::requestMsgInfo(uint32_t &token, uint32_t ansType, } - if(req == NULL) + if(req == nullptr) { - std::cerr << "RsGxsDataAccess::requestMsgInfo() request type not recognised, type " - << reqType << std::endl; + RsErr() << "RsGxsDataAccess::requestMsgInfo() request type not recognised, type " << reqType << std::endl; return false; }else { generateToken(token); #ifdef DATA_DEBUG - std::cerr << "RsGxsDataAccess::requestMsgInfo() gets Token: " << token << std::endl; + RsErr() << "RsGxsDataAccess::requestMsgInfo() gets Token: " << token << std::endl; #endif } @@ -201,10 +197,9 @@ bool RsGxsDataAccess::requestMsgInfo(uint32_t &token, uint32_t ansType, return true; } -bool RsGxsDataAccess::requestMsgInfo(uint32_t &token, uint32_t ansType, - const RsTokReqOptions &opts, const std::list& grpIds) +bool RsGxsDataAccess::requestMsgInfo(uint32_t &token, uint32_t ansType, const RsTokReqOptions &opts, const std::list& grpIds) { - GxsRequest* req = NULL; + GxsRequest* req = nullptr; uint32_t reqType = opts.mReqType; std::list::const_iterator lit = grpIds.begin(); @@ -237,16 +232,15 @@ bool RsGxsDataAccess::requestMsgInfo(uint32_t &token, uint32_t ansType, req = mir; } - if(req == NULL) + if(req == nullptr) { - std::cerr << "RsGxsDataAccess::requestMsgInfo() request type not recognised, type " - << reqType << std::endl; + RsErr() << __PRETTY_FUNCTION__ << " request type not recognised, type " << reqType << std::endl; return false; }else { generateToken(token); #ifdef DATA_DEBUG - std::cerr << "RsGxsDataAccess::requestMsgInfo() gets Token: " << token << std::endl; + RsErr() << __PRETTY_FUNCTION__ << " gets Token: " << token << std::endl; #endif } @@ -256,33 +250,40 @@ bool RsGxsDataAccess::requestMsgInfo(uint32_t &token, uint32_t ansType, } -void RsGxsDataAccess::requestServiceStatistic(uint32_t& token) +void RsGxsDataAccess::requestServiceStatistic(uint32_t& token,const RsTokReqOptions& opts) { ServiceStatisticRequest* req = new ServiceStatisticRequest(); generateToken(token); - RsTokReqOptions opts; - opts.mReqType = GXS_REQUEST_TYPE_SERVICE_STATS; + if(opts.mReqType != GXS_REQUEST_TYPE_SERVICE_STATS) + { + RsErr() << "Expected opts.mReqType to be GXS_REQUEST_TYPE_SERVICE_STATS requestServiceStatistic()" << std::endl; + return; + } + setReq(req, token, 0, opts); storeRequest(req); } -void RsGxsDataAccess::requestGroupStatistic(uint32_t& token, const RsGxsGroupId& grpId) +void RsGxsDataAccess::requestGroupStatistic(uint32_t& token, const RsGxsGroupId& grpId,const RsTokReqOptions& opts) { GroupStatisticRequest* req = new GroupStatisticRequest(); req->mGrpId = grpId; + if(opts.mReqType != GXS_REQUEST_TYPE_GROUP_STATS) + { + RsErr() << "Expected opts.mReqType to be GXS_REQUEST_TYPE_SERVICE_STATS requestServiceStatistic()" << std::endl; + return; + } + generateToken(token); - RsTokReqOptions opts; - opts.mReqType = GXS_REQUEST_TYPE_GROUP_STATS; - setReq(req, token, 0, opts); + setReq(req, token,0, opts); storeRequest(req); } -bool RsGxsDataAccess::requestMsgRelatedInfo(uint32_t &token, uint32_t ansType, const RsTokReqOptions &opts, - const std::vector &msgIds) +bool RsGxsDataAccess::requestMsgRelatedInfo(uint32_t &token, uint32_t ansType, const RsTokReqOptions &opts, const std::vector &msgIds) { MsgRelatedInfoReq* req = new MsgRelatedInfoReq(); @@ -300,7 +301,7 @@ bool RsGxsDataAccess::requestMsgRelatedInfo(uint32_t &token, uint32_t ansType, c void RsGxsDataAccess::setReq(GxsRequest* req, uint32_t token, uint32_t ansType, const RsTokReqOptions& opts) const { req->token = token; - req->ansType = ansType; + req->clientAnswerType = ansType; req->Options = opts; return; } @@ -308,8 +309,18 @@ void RsGxsDataAccess::storeRequest(GxsRequest* req) { RS_STACK_MUTEX(mDataMutex); req->status = PENDING; - req->reqTime = time(NULL); - mRequests[req->token] = req; + req->reqTime = time(nullptr); + + mRequestQueue.insert(std::make_pair(req->token,req)); + mPublicToken[req->token] = PENDING; + +#ifdef DATA_DEBUG + RsErr() << "Stored request token=" << req->token << " priority = " << static_cast(req->Options.mPriority) << " Current request Queue is:" ; + for(auto it(mRequestQueue.begin());it!=mRequestQueue.end();++it) + RsErr() << it->first << " (p=" << static_cast(req->Options.mPriority) << ") "; + std::cerr << std::endl; + RsErr() << "Completed requests waiting for client: " << mCompletedRequests.size() << std::endl; +#endif } RsTokenService::GxsRequestStatus RsGxsDataAccess::requestStatus(uint32_t token) @@ -319,14 +330,6 @@ RsTokenService::GxsRequestStatus RsGxsDataAccess::requestStatus(uint32_t token) uint32_t anstype; rstime_t ts; - { - RS_STACK_MUTEX(mDataMutex); - - // first check public tokens - if(mPublicToken.find(token) != mPublicToken.end()) - return mPublicToken[token]; - } - if (!checkRequestStatus(token, status, reqtype, anstype, ts)) return RsTokenService::FAILED; @@ -337,7 +340,7 @@ bool RsGxsDataAccess::cancelRequest(const uint32_t& token) { RsStackMutex stack(mDataMutex); /****** LOCKED *****/ - GxsRequest* req = locked_retrieveRequest(token); + GxsRequest* req = locked_retrieveCompetedRequest(token); if (!req) { return false; @@ -350,58 +353,52 @@ bool RsGxsDataAccess::cancelRequest(const uint32_t& token) bool RsGxsDataAccess::clearRequest(const uint32_t& token) { - RsStackMutex stack(mDataMutex); /****** LOCKED *****/ + RS_STACK_MUTEX(mDataMutex); + return locked_clearRequest(token); +} - std::map::iterator it; +bool RsGxsDataAccess::locked_clearRequest(const uint32_t& token) +{ + auto it = mCompletedRequests.find(token); - it = mRequests.find(token); - if (it == mRequests.end()) - { - return false; - } + if(it == mCompletedRequests.end()) + return false; - delete it->second; - mRequests.erase(it); + delete it->second; + mCompletedRequests.erase(it); - return true; + auto it2 = mPublicToken.find(token); + if(it2 != mPublicToken.end()) + mPublicToken.erase(it2); + + return true; } bool RsGxsDataAccess::getGroupSummary(const uint32_t& token, std::list& groupInfo) { - RS_STACK_MUTEX(mDataMutex); - GxsRequest* req = locked_retrieveRequest(token); + GxsRequest* req = locked_retrieveCompetedRequest(token); - if(req == NULL) + if(req == nullptr) { - std::cerr << "RsGxsDataAccess::getGroupSummary() Unable to retrieve " - << "group summary" << std::endl; + RsErr() << __PRETTY_FUNCTION__ << " Unable to retrieve group summary" << std::endl; return false; } - else if(req->status == COMPLETE) - { - GroupMetaReq* gmreq = dynamic_cast(req); - if(gmreq) - { - groupInfo = gmreq->mGroupMetaData; - gmreq->mGroupMetaData.clear(); - locked_updateRequestStatus(token, DONE); - } - else - { - std::cerr << "RsGxsDataAccess::getGroupSummary() Req found, failed" - << "cast" << std::endl; - return false; - } + GroupMetaReq* gmreq = dynamic_cast(req); + + if(gmreq) + { + groupInfo = gmreq->mGroupMetaData; + gmreq->mGroupMetaData.clear(); } else { - std::cerr << "RsGxsDataAccess::getGroupSummary() Req not ready" - << std::endl; + RsErr() << __PRETTY_FUNCTION__ << " Req found, failed cast" << std::endl; return false; } + locked_clearRequest(token); return true; } @@ -410,44 +407,33 @@ bool RsGxsDataAccess::getGroupData(const uint32_t& token, std::list& { RS_STACK_MUTEX(mDataMutex); - GxsRequest* req = locked_retrieveRequest(token); + GxsRequest* req = locked_retrieveCompetedRequest(token); - if(req == NULL) + if(req == nullptr) { - std::cerr << "RsGxsDataAccess::getGroupData() Unable to retrieve group" - << "data" << std::endl; + RsErr() << "RsGxsDataAccess::getGroupData() Unable to retrieve group data" << std::endl; return false; } - else if(req->status == COMPLETE) + + GroupDataReq* gmreq = dynamic_cast(req); + GroupSerializedDataReq* gsreq = dynamic_cast(req); + + if(gsreq) { - GroupDataReq* gmreq = dynamic_cast(req); - GroupSerializedDataReq* gsreq = dynamic_cast(req); - - if(gsreq) - { - grpData.swap(gsreq->mGroupData); - gsreq->mGroupData.clear(); - - locked_updateRequestStatus(token, DONE); - } - else if(gmreq) - { - grpData.swap(gmreq->mGroupData); - gmreq->mGroupData.clear(); - locked_updateRequestStatus(token, DONE); - } - else - { - std::cerr << "RsGxsDataAccess::getGroupData() Req found, failed cast" - << " req->reqType: " << req->reqType << std::endl; - return false; - } + grpData.swap(gsreq->mGroupData); + gsreq->mGroupData.clear(); + } + else if(gmreq) + { + grpData.swap(gmreq->mGroupData); + gmreq->mGroupData.clear(); } else { - std::cerr << "RsGxsDataAccess::getGroupData() Req not ready" << std::endl; + RsErr() << __PRETTY_FUNCTION__ << " Req found, failed cast req->reqType: " << req->reqType << std::endl; return false; } + locked_clearRequest(token); return true; } @@ -457,32 +443,29 @@ bool RsGxsDataAccess::getMsgData(const uint32_t& token, NxsMsgDataResult& msgDat RsStackMutex stack(mDataMutex); - GxsRequest* req = locked_retrieveRequest(token); + GxsRequest* req = locked_retrieveCompetedRequest(token); - if(req == NULL){ + if(req == nullptr) + { - std::cerr << "RsGxsDataAccess::getMsgData() Unable to retrieve group data" << std::endl; - return false; - }else if(req->status == COMPLETE){ - - MsgDataReq* mdreq = dynamic_cast(req); - - if(mdreq) - { - msgData.swap(mdreq->mMsgData); - mdreq->mMsgData.clear(); - locked_updateRequestStatus(token, DONE); - } - else - { - std::cerr << "RsGxsDataAccess::getMsgData() Req found, failed caste" << std::endl; - return false; - } - }else{ - std::cerr << "RsGxsDataAccess::getMsgData() Req not ready" << std::endl; + RsErr() << __PRETTY_FUNCTION__ << " Unable to retrieve group data" << std::endl; return false; } + MsgDataReq* mdreq = dynamic_cast(req); + + if(mdreq) + { + msgData.swap(mdreq->mMsgData); + mdreq->mMsgData.clear(); + } + else + { + RsErr() << "RsGxsDataAccess::getMsgData() Req found, failed caste" << std::endl; + return false; + } + locked_clearRequest(token); + return true; } @@ -491,34 +474,30 @@ bool RsGxsDataAccess::getMsgRelatedData(const uint32_t &token, NxsMsgRelatedData RsStackMutex stack(mDataMutex); - GxsRequest* req = locked_retrieveRequest(token); + GxsRequest* req = locked_retrieveCompetedRequest(token); - if(req == NULL){ + if(req == nullptr) + { + RsErr() << __PRETTY_FUNCTION__ << " Unable to retrieve group data" << std::endl; + return false; + } + MsgRelatedInfoReq* mrireq = dynamic_cast(req); - std::cerr << "RsGxsDataAccess::getMsgRelatedData() Unable to retrieve group data" << std::endl; - return false; - }else if(req->status == COMPLETE){ + if(req->Options.mReqType != GXS_REQUEST_TYPE_MSG_RELATED_DATA) + return false; - MsgRelatedInfoReq* mrireq = dynamic_cast(req); + if(mrireq) + { + msgData.swap(mrireq->mMsgDataResult); + mrireq->mMsgDataResult.clear(); + } + else + { + RsErr() << __PRETTY_FUNCTION__ << " Req found, failed caste" << std::endl; + return false; + } - if(req->Options.mReqType != GXS_REQUEST_TYPE_MSG_RELATED_DATA) - return false; - - if(mrireq) - { - msgData.swap(mrireq->mMsgDataResult); - mrireq->mMsgDataResult.clear(); - locked_updateRequestStatus(token, DONE); - } - else - { - std::cerr << "RsGxsDataAccess::getMsgRelatedData() Req found, failed caste" << std::endl; - return false; - } - }else{ - std::cerr << "RsGxsDataAccess::getMsgRelatedData() Req not ready" << std::endl; - return false; - } + locked_clearRequest(token); return true; } @@ -528,144 +507,114 @@ bool RsGxsDataAccess::getMsgSummary(const uint32_t& token, GxsMsgMetaResult& msg RsStackMutex stack(mDataMutex); - GxsRequest* req = locked_retrieveRequest(token); + GxsRequest* req = locked_retrieveCompetedRequest(token); - if(req == NULL){ - - std::cerr << "RsGxsDataAccess::getMsgSummary() Unable to retrieve group data" << std::endl; - return false; - }else if(req->status == COMPLETE){ - - MsgMetaReq* mmreq = dynamic_cast(req); - - if(mmreq) - { - msgInfo.swap(mmreq->mMsgMetaData); - mmreq->mMsgMetaData.clear(); - locked_updateRequestStatus(token, DONE); - - } - else - { - std::cerr << "RsGxsDataAccess::getMsgSummary() Req found, failed caste" << std::endl; - return false; - } - }else{ - std::cerr << "RsGxsDataAccess::getMsgSummary() Req not ready" << std::endl; + if(req == nullptr) + { + RsErr() << __PRETTY_FUNCTION__ << " Unable to retrieve group data" << std::endl; return false; } + MsgMetaReq* mmreq = dynamic_cast(req); + if(mmreq) + { + msgInfo.swap(mmreq->mMsgMetaData); + mmreq->mMsgMetaData.clear(); + } + else + { + RsErr() << " Req found, failed caste" << std::endl; + return false; + } + locked_clearRequest(token); return true; } bool RsGxsDataAccess::getMsgRelatedSummary(const uint32_t &token, MsgRelatedMetaResult &msgMeta) { + RsStackMutex stack(mDataMutex); - RsStackMutex stack(mDataMutex); + GxsRequest* req = locked_retrieveCompetedRequest(token); - GxsRequest* req = locked_retrieveRequest(token); + if(req == nullptr) + { + RsErr() << __PRETTY_FUNCTION__ << " Unable to retrieve message summary" << std::endl; + return false; + } + if(req->Options.mReqType != GXS_REQUEST_TYPE_MSG_RELATED_META) + return false; + MsgRelatedInfoReq* mrireq = dynamic_cast(req); - - if(req == NULL){ - - std::cerr << "RsGxsDataAccess::getMsgRelatedSummary() Unable to retrieve message summary" << std::endl; - return false; - }else if(req->status == COMPLETE){ - - if(req->Options.mReqType != GXS_REQUEST_TYPE_MSG_RELATED_META) - return false; - - MsgRelatedInfoReq* mrireq = dynamic_cast(req); - - if(mrireq) - { - msgMeta.swap(mrireq->mMsgMetaResult); - mrireq->mMsgMetaResult.clear(); - locked_updateRequestStatus(token, DONE); - } - else - { - std::cerr << "RsGxsDataAccess::getMsgRelatedSummary() Req found, failed caste" << std::endl; - return false; - } - } - else - { - std::cerr << "RsGxsDataAccess::getMsgRelatedSummary() Req not ready" << std::endl; - return false; - } - - return true; + if(mrireq) + { + msgMeta.swap(mrireq->mMsgMetaResult); + mrireq->mMsgMetaResult.clear(); + } + else + { + RsErr() << __PRETTY_FUNCTION__ << " Req found, failed caste" << std::endl; + return false; + } + locked_clearRequest(token); + return true; } bool RsGxsDataAccess::getMsgRelatedList(const uint32_t &token, MsgRelatedIdResult &msgIds) { - RsStackMutex stack(mDataMutex); + RsStackMutex stack(mDataMutex); - GxsRequest* req = locked_retrieveRequest(token); + GxsRequest* req = locked_retrieveCompetedRequest(token); - if(req == NULL){ + if(req == nullptr) + { + RsErr() << __PRETTY_FUNCTION__ << " Unable to retrieve message data" << std::endl; + return false; + } + if(req->Options.mReqType != GXS_REQUEST_TYPE_MSG_RELATED_IDS) + return false; - std::cerr << "RsGxsDataAccess::getMsgRelatedList() Unable to retrieve message data" << std::endl; - return false; - }else if(req->status == COMPLETE){ + MsgRelatedInfoReq* mrireq = dynamic_cast(req); - if(req->Options.mReqType != GXS_REQUEST_TYPE_MSG_RELATED_IDS) - return false; - - MsgRelatedInfoReq* mrireq = dynamic_cast(req); - - if(mrireq) - { - msgIds.swap(mrireq->mMsgIdResult); - mrireq->mMsgIdResult.clear(); - locked_updateRequestStatus(token, DONE); - } - else{ - std::cerr << "RsGxsDataAccess::getMsgRelatedList() Req found, failed caste" << std::endl; - return false; - } - } - else - { - std::cerr << "RsGxsDataAccess::getMsgRelatedList() Req not ready" << std::endl; - return false; - } - - return true; + if(mrireq) + { + msgIds.swap(mrireq->mMsgIdResult); + mrireq->mMsgIdResult.clear(); + } + else + { + RsErr() << __PRETTY_FUNCTION__ << " Req found, failed caste" << std::endl; + return false; + } + locked_clearRequest(token); + return true; } -bool RsGxsDataAccess::getMsgList(const uint32_t& token, GxsMsgIdResult& msgIds) +bool RsGxsDataAccess::getMsgIdList(const uint32_t& token, GxsMsgIdResult& msgIds) { RsStackMutex stack(mDataMutex); - GxsRequest* req = locked_retrieveRequest(token); + GxsRequest* req = locked_retrieveCompetedRequest(token); - if(req == NULL){ - - std::cerr << "RsGxsDataAccess::getMsgList() Unable to retrieve msg Ids" << std::endl; - return false; - }else if(req->status == COMPLETE){ - - MsgIdReq* mireq = dynamic_cast(req); - - if(mireq) - { - msgIds.swap(mireq->mMsgIdResult); - mireq->mMsgIdResult.clear(); - locked_updateRequestStatus(token, DONE); - } - else{ - std::cerr << "RsGxsDataAccess::getMsgList() Req found, failed caste" << std::endl; - return false; - } - }else{ - std::cerr << "RsGxsDataAccess::getMsgList() Req not ready" << std::endl; + if(req == nullptr) + { + RsErr() << __PRETTY_FUNCTION__ << " Unable to retrieve msg Ids" << std::endl; return false; } + MsgIdReq* mireq = dynamic_cast(req); + if(mireq) + { + msgIds.swap(mireq->mMsgIdResult); + mireq->mMsgIdResult.clear(); + } + else + { + RsErr() << __PRETTY_FUNCTION__ << " Req found, failed cast" << std::endl; + return false; + } + locked_clearRequest(token); return true; } @@ -673,127 +622,135 @@ bool RsGxsDataAccess::getGroupList(const uint32_t& token, std::liststatus == COMPLETE){ - - GroupIdReq* gireq = dynamic_cast(req); - - if(gireq) - { - groupIds.swap(gireq->mGroupIdResult); - gireq->mGroupIdResult.clear(); - locked_updateRequestStatus(token, DONE); - - }else{ - std::cerr << "RsGxsDataAccess::getGroupList() Req found, failed caste" << std::endl; - return false; - } - }else{ - std::cerr << "RsGxsDataAccess::getGroupList() Req not ready" << std::endl; + if(req == nullptr) + { + RsErr() << __PRETTY_FUNCTION__ << " Unable to retrieve group Ids, Request does not exist" << std::endl; return false; } + GroupIdReq* gireq = dynamic_cast(req); + if(gireq) + { + groupIds.swap(gireq->mGroupIdResult); + gireq->mGroupIdResult.clear(); + } + else + { + RsErr() << __PRETTY_FUNCTION__ << " Req found, failed caste" << std::endl; + return false; + } + locked_clearRequest(token); return true; } -GxsRequest* RsGxsDataAccess::locked_retrieveRequest(const uint32_t& token) +bool RsGxsDataAccess::getGroupStatistic(const uint32_t &token, GxsGroupStatistic &grpStatistic) { + RsStackMutex stack(mDataMutex); - if(mRequests.find(token) == mRequests.end()) return NULL; + GxsRequest* req = locked_retrieveCompetedRequest(token); - GxsRequest* req = mRequests[token]; + if(req == nullptr) + { + RsErr() << "RsGxsDataAccess::getGroupStatistic() Unable to retrieve grp stats" << std::endl; + return false; + } + GroupStatisticRequest* gsreq = dynamic_cast(req); - return req; + if(gsreq) + grpStatistic = gsreq->mGroupStatistic; + else + { + RsErr() << __PRETTY_FUNCTION__ << " Req found, failed caste" << std::endl; + return false; + } + locked_clearRequest(token); + return true; +} + +bool RsGxsDataAccess::getServiceStatistic(const uint32_t &token, GxsServiceStatistic &servStatistic) +{ + RsStackMutex stack(mDataMutex); + + GxsRequest* req = locked_retrieveCompetedRequest(token); + + if(req == nullptr) + { + RsErr() << __PRETTY_FUNCTION__ << " Unable to retrieve service stats" << std::endl; + return false; + } + ServiceStatisticRequest* ssreq = dynamic_cast(req); + + if(ssreq) + servStatistic = ssreq->mServiceStatistic; + else + { + RsErr() << __PRETTY_FUNCTION__ << " Req found, failed caste" << std::endl; + return false; + } + locked_clearRequest(token); + return true; +} +GxsRequest* RsGxsDataAccess::locked_retrieveCompetedRequest(const uint32_t& token) +{ + auto it = mCompletedRequests.find(token) ; + + if(it == mCompletedRequests.end()) + return nullptr; + + return it->second; } #define MAX_REQUEST_AGE 120 // 2 minutes void RsGxsDataAccess::processRequests() { - std::list toClear; - rstime_t now = time(NULL); - std::map::iterator it; - - { - RsStackMutex stack(mDataMutex); /******* LOCKED *******/ - - // process status of the requests - for (it = mRequests.begin(); it != mRequests.end(); ++it) - { - GxsRequest* req = it->second; - - switch (req->status) - { - case PENDING: - // process request later - break; - case PARTIAL: - // should not happen - req->status = COMPLETE; - break; - case DONE: -#ifdef DATA_DEBUG - std::cerr << "RsGxsDataAccess::processrequests() Clearing Done Request Token: " << req->token; - std::cerr << std::endl; -#endif - toClear.push_back(req->token); - break; - case CANCELLED: -#ifdef DATA_DEBUG - std::cerr << "RsGxsDataAccess::processrequests() Clearing Cancelled Request Token: " << req->token; - std::cerr << std::endl; -#endif - toClear.push_back(req->token); - break; - default: - if (now - req->reqTime > MAX_REQUEST_AGE) - { -#ifdef DATA_DEBUG - std::cerr << "RsGxsDataAccess::processrequests() Clearing Old Request Token: " << req->token; - std::cerr << std::endl; -#endif - toClear.push_back(req->token); - } - } - } - } // END OF MUTEX. - - // clear requests - std::list::iterator cit; - for (cit = toClear.begin(); cit != toClear.end(); ++cit) - { - clearRequest(*cit); - } - // process requests - while (true) + + while (!mRequestQueue.empty()) { - GxsRequest* req = NULL; + // Extract the first elements from the request queue. cleanup all other elements marked at terminated. + + GxsRequest* req = nullptr; { RsStackMutex stack(mDataMutex); /******* LOCKED *******/ + rstime_t now = time(nullptr); // this is ok while in the loop below - // get the first pending request - for (it = mRequests.begin(); it != mRequests.end(); ++it) - { - GxsRequest* reqCheck = it->second; - if (reqCheck->status == PENDING) - { - req = reqCheck; + while(!mRequestQueue.empty() && req == nullptr) + { + if(now > mRequestQueue.begin()->second->reqTime + MAX_REQUEST_AGE) + { + mRequestQueue.erase(mRequestQueue.begin()); + continue; + } + + switch( mRequestQueue.begin()->second->status ) + { + case PARTIAL: + RsErr() << "Found partial request in mRequestQueue. This is a bug." << std::endl; // fallthrough + case COMPLETE: + case DONE: + case FAILED: + case CANCELLED: +#ifdef DATA_DEBUG + RsDbg() << " request " << mRequestQueue.begin()->second->token << ": status = " << mRequestQueue.begin()->second->status << ": removing from the RequestQueue" << std::endl; +#endif + mRequestQueue.erase(mRequestQueue.begin()); + continue; + break; + case PENDING: + req = mRequestQueue.begin()->second; req->status = PARTIAL; - break; - } - } - } // END OF MUTEX. + mRequestQueue.erase(mRequestQueue.begin()); // remove it right away from the waiting queue. + break; + } - if (!req) { + } + } + + if (!req) break; - } GroupMetaReq* gmr; GroupDataReq* gdr; @@ -808,132 +765,85 @@ void RsGxsDataAccess::processRequests() ServiceStatisticRequest* ssr; #ifdef DATA_DEBUG - std::cerr << "RsGxsDataAccess::processRequests() Processing Token: " << req->token << " Status: " - << req->status << " ReqType: " << req->reqType << " Age: " - << now - req->reqTime << std::endl; + RsDbg() << "Processing request: " << req->token << " Status: " << req->status << " ReqType: " << req->reqType << " Age: " << time(nullptr) - req->reqTime << std::endl; #endif /* PROCESS REQUEST! */ bool ok = false; - if((gmr = dynamic_cast(req)) != NULL) + if((gmr = dynamic_cast(req)) != nullptr) { ok = getGroupSummary(gmr); } - else if((gdr = dynamic_cast(req)) != NULL) + else if((gdr = dynamic_cast(req)) != nullptr) { ok = getGroupData(gdr); } - else if((gir = dynamic_cast(req)) != NULL) + else if((gir = dynamic_cast(req)) != nullptr) { ok = getGroupList(gir); } - else if((mmr = dynamic_cast(req)) != NULL) + else if((mmr = dynamic_cast(req)) != nullptr) { ok = getMsgSummary(mmr); } - else if((mdr = dynamic_cast(req)) != NULL) + else if((mdr = dynamic_cast(req)) != nullptr) { ok = getMsgData(mdr); } - else if((mir = dynamic_cast(req)) != NULL) + else if((mir = dynamic_cast(req)) != nullptr) { - ok = getMsgList(mir); + ok = getMsgIdList(mir); } - else if((mri = dynamic_cast(req)) != NULL) + else if((mri = dynamic_cast(req)) != nullptr) { ok = getMsgRelatedInfo(mri); } - else if((gsr = dynamic_cast(req)) != NULL) + else if((gsr = dynamic_cast(req)) != nullptr) { ok = getGroupStatistic(gsr); } - else if((ssr = dynamic_cast(req)) != NULL) + else if((ssr = dynamic_cast(req)) != nullptr) { ok = getServiceStatistic(ssr); } - else if((grr = dynamic_cast(req)) != NULL) + else if((grr = dynamic_cast(req)) != nullptr) { ok = getGroupSerializedData(grr); } - else - { - std::cerr << "RsGxsDataAccess::processRequests() Failed to process request, token: " - << req->token << std::endl; - } + RsErr() << __PRETTY_FUNCTION__ << " Failed to process request, token: " << req->token << std::endl; + // We cannot easily remove the request here because the queue may have more elements now and mRequestQueue.begin() is not necessarily the same element. + // but we mark it as COMPLETE/FAILED so that it will be removed in the next loop. { RsStackMutex stack(mDataMutex); /******* LOCKED *******/ - if (req->status == PARTIAL) - { - req->status = ok ? COMPLETE : FAILED; - } - } // END OF MUTEX. - } -} -bool RsGxsDataAccess::getGroupStatistic(const uint32_t &token, GxsGroupStatistic &grpStatistic) -{ - RsStackMutex stack(mDataMutex); + if(ok) + { + // When the request is complete, we move it to the complete list, so that the caller can easily retrieve the request data - GxsRequest* req = locked_retrieveRequest(token); - - if(req == NULL){ - - std::cerr << "RsGxsDataAccess::getGroupStatistic() Unable to retrieve grp stats" << std::endl; - return false; - }else if(req->status == COMPLETE){ - - GroupStatisticRequest* gsreq = dynamic_cast(req); - - if(gsreq) - { - grpStatistic = gsreq->mGroupStatistic; - locked_updateRequestStatus(token, DONE); - } - else{ - std::cerr << "RsGxsDataAccess::getGroupStatistic() Req found, failed caste" << std::endl; - return false; +#ifdef DATA_DEBUG + RsDbg() << " Request completed successfully. Marking as COMPLETE." << std::endl; +#endif + req->status = COMPLETE ; + mCompletedRequests[req->token] = req; + mPublicToken[req->token] = COMPLETE; + } + else + { + req->status = FAILED; + mPublicToken[req->token] = FAILED; +#ifdef DATA_DEBUG + RsDbg() << " Request failed. Marking as FAILED." << std::endl; +#endif + } } - }else{ - std::cerr << "RsGxsDataAccess::getGroupStatistic() Req not ready" << std::endl; - return false; - } - return true; + } // END OF MUTEX. } -bool RsGxsDataAccess::getServiceStatistic(const uint32_t &token, GxsServiceStatistic &servStatistic) -{ - RsStackMutex stack(mDataMutex); - GxsRequest* req = locked_retrieveRequest(token); - - if(req == NULL){ - - std::cerr << "RsGxsDataAccess::getServiceStatistic() Unable to retrieve service stats" << std::endl; - return false; - }else if(req->status == COMPLETE){ - - ServiceStatisticRequest* ssreq = dynamic_cast(req); - - if(ssreq) - { - servStatistic = ssreq->mServiceStatistic; - locked_updateRequestStatus(token, DONE); - } - else{ - std::cerr << "RsGxsDataAccess::getServiceStatistic() Req found, failed caste" << std::endl; - return false; - } - }else{ - std::cerr << "RsGxsDataAccess::getServiceStatistic() Req not ready" << std::endl; - return false; - } - - return true; -} bool RsGxsDataAccess::getGroupSerializedData(GroupSerializedDataReq* req) { @@ -947,7 +857,7 @@ bool RsGxsDataAccess::getGroupSerializedData(GroupSerializedDataReq* req) for(std::list::iterator lit = grpIdsOut.begin();lit != grpIdsOut.end();++lit) - grpData[*lit] = NULL; + grpData[*lit] = nullptr; bool ok = mDataStore->retrieveNxsGrps(grpData, true, true); req->mGroupData.clear(); @@ -974,7 +884,7 @@ bool RsGxsDataAccess::getGroupData(GroupDataReq* req) for(; lit != lit_end; ++lit) { - grpData[*lit] = NULL; + grpData[*lit] = nullptr; } bool ok = mDataStore->retrieveNxsGrps(grpData, true, true); @@ -1000,7 +910,7 @@ bool RsGxsDataAccess::getGroupSummary(GroupMetaReq* req) std::list::const_iterator lit = grpIdsOut.begin(); for(; lit != grpIdsOut.end(); ++lit) - grpMeta[*lit] = NULL; + grpMeta[*lit] = nullptr; mDataStore->retrieveGxsGrpMetaData(grpMeta); @@ -1039,7 +949,7 @@ bool RsGxsDataAccess::getMsgData(MsgDataReq* req) const RsTokReqOptions& opts(req->Options); // filter based on options - getMsgList(req->mMsgIds, opts, msgIdOut); + getMsgIdList(req->mMsgIds, opts, msgIdOut); // If the list is empty because of filtering do not retrieve from DB if((opts.mMsgFlagMask || opts.mStatusMask) && msgIdOut.empty()) @@ -1057,24 +967,21 @@ bool RsGxsDataAccess::getMsgSummary(MsgMetaReq* req) const RsTokReqOptions& opts(req->Options); // filter based on options - getMsgList(req->mMsgIds, opts, msgIdOut); + getMsgMetaDataList(req->mMsgIds, opts, req->mMsgMetaData); - // If the list is empty because of filtering do not retrieve from DB - if((opts.mMsgFlagMask || opts.mStatusMask) && msgIdOut.empty()) - return true; - - mDataStore->retrieveGxsMsgMetaData(msgIdOut, req->mMsgMetaData); +// // If the list is empty because of filtering do not retrieve from DB +// if((opts.mMsgFlagMask || opts.mStatusMask) && msgIdOut.empty()) +// return true; +// +// mDataStore->retrieveGxsMsgMetaData(msgIdOut, req->mMsgMetaData); return true; } - -bool RsGxsDataAccess::getMsgList( - const GxsMsgReq& msgIds, const RsTokReqOptions& opts, - GxsMsgReq& msgIdsOut ) +bool RsGxsDataAccess::getMsgMetaDataList( const GxsMsgReq& msgIds, const RsTokReqOptions& opts, GxsMsgMetaResult& result ) { - GxsMsgMetaResult result; - + // First get all message metas, then filter out the ones we want to keep. + result.clear(); mDataStore->retrieveGxsMsgMetaData(msgIds, result); /* CASEs this handles. @@ -1083,8 +990,7 @@ bool RsGxsDataAccess::getMsgList( * */ #ifdef DATA_DEBUG - std::cerr << "RsGxsDataAccess::getMsgList()"; - std::cerr << std::endl; + RsDbg() << "RsGxsDataAccess::getMsgList()" << std::endl; #endif bool onlyOrigMsgs = false; @@ -1095,16 +1001,14 @@ bool RsGxsDataAccess::getMsgList( if (opts.mOptions & RS_TOKREQOPT_MSG_ORIGMSG) { #ifdef DATA_DEBUG - std::cerr << "RsGxsDataAccess::getMsgList() MSG_ORIGMSG"; - std::cerr << std::endl; + RsDbg() << "RsGxsDataAccess::getMsgList() MSG_ORIGMSG" << std::endl; #endif onlyOrigMsgs = true; } else if (opts.mOptions & RS_TOKREQOPT_MSG_LATEST) { #ifdef DATA_DEBUG - std::cerr << "RsGxsDataAccess::getMsgList() MSG_LATEST"; - std::cerr << std::endl; + RsDbg() << "RsGxsDataAccess::getMsgList() MSG_LATEST" << std::endl; #endif onlyLatestMsgs = true; } @@ -1112,130 +1016,176 @@ bool RsGxsDataAccess::getMsgList( if (opts.mOptions & RS_TOKREQOPT_MSG_THREAD) { #ifdef DATA_DEBUG - std::cerr << "RsGxsDataAccess::getMsgList() MSG_THREAD"; - std::cerr << std::endl; + RsDbg() << "RsGxsDataAccess::getMsgList() MSG_THREAD" << std::endl; #endif onlyThreadHeadMsgs = true; } GxsMsgMetaResult::iterator meta_it; - MsgMetaFilter metaFilter; for(meta_it = result.begin(); meta_it != result.end(); ++meta_it) { const RsGxsGroupId& grpId = meta_it->first; - metaFilter[grpId] = std::map(); + //auto& filter( metaFilter[grpId] ); // does the initialization of metaFilter[grpId] and avoids further O(log(n)) calls - const std::vector& metaV = meta_it->second; - if (onlyLatestMsgs) // THIS ONE IS HARD -> LOTS OF COMP. + std::vector& metaV = meta_it->second; + + if (onlyLatestMsgs) // if we only consider latest messages, we need to first filter out messages with "children" { - std::vector::const_iterator vit = metaV.begin(); + // The strategy is the following: for each msg we only know its direct ancestor. So we build a map to be able to find for a given message + // which messages derive from it. + // Then when this map is fuly build, we follow this map and every message that has no direct follow up will be kept. + // Because msgs are stored in a std::vector we build a map to convert each vector to its position in metaV. + std::vector keep(metaV.size(),true); // this vector will tell wether we keep or not a given Meta + std::map index_in_metaV; // holds the index of each group Id in metaV + + for(uint32_t i=0;imMsgId] = i; + + // Now loop once over message Metas and see if they have a parent. If yes, then mark the parent to be discarded. + + for(uint32_t i=0;imParentId.isNull() && metaV[i]->mParentId != metaV[i]->mMsgId) // this one is a follow up + { + auto it = index_in_metaV.find(metaV[i]->mParentId); + + if(it != index_in_metaV.end()) + keep[it->second] = false; + else + std::cerr << "Found a msg that has a parent that is not locally known. Not an error anyway." << std::endl; + + } + + // Finally we just discard the messages for which the keep flag has been set to false. + + for(uint32_t i=0;i TS. std::map > origMsgTs; - std::map >::iterator oit; - for(; vit != metaV.end(); ++vit) + for(uint32_t i=0;imParentId.isNull())) { - if (!(msgMeta->mParentId.isNull())) + delete msgMeta; + metaV[i] = nullptr; + continue; + } + + auto oit = origMsgTs.find(msgMeta->mOrigMsgId); + + bool addMsg = false; + if (oit != origMsgTs.end()) + { + if(oit->second.second > msgMeta->mPublishTs) { - continue; +#ifdef DATA_DEBUG + std::cerr << "RsGxsDataAccess::getMsgList() Found New OrigMsgId: "; + std::cerr << msgMeta->mOrigMsgId; + std::cerr << " MsgId: " << msgMeta->mMsgId; + std::cerr << " TS: " << msgMeta->mPublishTs; + std::cerr << std::endl; +#endif + origMsgTs[msgMeta->mOrigMsgId] = std::make_pair(msgMeta->mMsgId, msgMeta->mPublishTs); // add as latest. (overwriting if necessary) } } - - - oit = origMsgTs.find(msgMeta->mOrigMsgId); - bool addMsg = false; - if (oit == origMsgTs.end()) + else { -#ifdef DATA_DEBUG - std::cerr << "RsGxsDataAccess::getMsgList() Found New OrigMsgId: "; - std::cerr << msgMeta->mOrigMsgId; - std::cerr << " MsgId: " << msgMeta->mMsgId; - std::cerr << " TS: " << msgMeta->mPublishTs; - std::cerr << std::endl; -#endif - - addMsg = true; - } - // check timestamps. - else if (oit->second.second < msgMeta->mPublishTs) - { -#ifdef DATA_DEBUG - std::cerr << "RsGxsDataAccess::getMsgList() Found Later Msg. OrigMsgId: "; - std::cerr << msgMeta->mOrigMsgId; - std::cerr << " MsgId: " << msgMeta->mMsgId; - std::cerr << " TS: " << msgMeta->mPublishTs; -#endif - - addMsg = true; + delete msgMeta; + metaV[i] = nullptr; + continue; } - if (addMsg) - { - // add as latest. (overwriting if necessary) - origMsgTs[msgMeta->mOrigMsgId] = std::make_pair(msgMeta->mMsgId, msgMeta->mPublishTs); - metaFilter[grpId].insert(std::make_pair(msgMeta->mMsgId, msgMeta)); - } + } // Add the discovered Latest Msgs. - for(oit = origMsgTs.begin(); oit != origMsgTs.end(); ++oit) + for(auto oit = origMsgTs.begin(); oit != origMsgTs.end(); ++oit) { - msgIdsOut[grpId].insert(oit->second.first); + msgIdsOut[grpId].insert(oit->second.first); } +#endif - } - else // ALL OTHER CASES. - { - std::vector::const_iterator vit = metaV.begin(); + for(uint32_t i=0;imParentId.isNull()) + { + delete msgMeta; + metaV[i] = nullptr; + continue; + } - /* if we are grabbing thread Head... then parentId == empty. */ - if (onlyThreadHeadMsgs) - { - if (!(msgMeta->mParentId.isNull())) - { - continue; - } - } - - - if (onlyOrigMsgs) - { - if (msgMeta->mMsgId == msgMeta->mOrigMsgId) - { - add = true; - } - } - else - { - add = true; - } - - if (add) - { - msgIdsOut[grpId].insert(msgMeta->mMsgId); - metaFilter[grpId].insert(std::make_pair(msgMeta->mMsgId, msgMeta)); - } - - } - } + if (onlyOrigMsgs && !msgMeta->mOrigMsgId.isNull() && msgMeta->mMsgId != msgMeta->mOrigMsgId) + { + delete msgMeta; + metaV[i] = nullptr; + continue; + } + } } - filterMsgList(msgIdsOut, opts, metaFilter); + // collapse results while keeping the order, eliminating empty slots - metaFilter.clear(); + for(auto it(result.begin());it!=result.end();++it) + { + uint32_t j=0; // j is the end of the cleaned-up tab, at the first available place + + for(uint32_t i=0;isecond.size();++i) // i is the index in the tab possibly containing nullptr's + if(it->second[i] != nullptr) + { + it->second[j] = it->second[i]; // move the pointer to the first available place + ++j; + } + + it->second.resize(j); // normally all pointers have been moved forward so there is nothing to delete here. + } + + // filterMsgIdList(msgIdsOut, opts, metaFilter); // this call is absurd: we already have in metaFilter the content we want. + + //metaFilter.clear(); + + // delete meta data + //cleanseMsgMetaMap(result); + + return true; +} + +bool RsGxsDataAccess::getMsgIdList( const GxsMsgReq& msgIds, const RsTokReqOptions& opts, GxsMsgReq& msgIdsOut ) +{ + GxsMsgMetaResult result; + + getMsgMetaDataList( msgIds, opts, result ); + + // extract MessageIds + + msgIdsOut.clear(); + + for(auto it(result.begin());it!=result.end();++it) + { + auto& id_set(msgIdsOut[it->first]); + + for(uint32_t i=0;isecond.size();++i) + id_set.insert(it->second[i]->mMsgId); + } // delete meta data cleanseMsgMetaMap(result); @@ -1250,8 +1200,7 @@ bool RsGxsDataAccess::getMsgRelatedInfo(MsgRelatedInfoReq *req) * 1) No Flags => return nothing */ #ifdef DATA_DEBUG - std::cerr << "RsGxsDataAccess::getMsgRelatedList()"; - std::cerr << std::endl; + RsDbg() << "RsGxsDataAccess::getMsgRelatedList()" << std::endl; #endif const RsTokReqOptions& opts = req->Options; @@ -1264,16 +1213,14 @@ bool RsGxsDataAccess::getMsgRelatedInfo(MsgRelatedInfoReq *req) if (opts.mOptions & RS_TOKREQOPT_MSG_LATEST) { #ifdef DATA_DEBUG - std::cerr << "RsGxsDataAccess::getMsgRelatedList() MSG_LATEST"; - std::cerr << std::endl; + RsDbg() << "RsGxsDataAccess::getMsgRelatedList() MSG_LATEST" << std::endl; #endif onlyLatestMsgs = true; } else if (opts.mOptions & RS_TOKREQOPT_MSG_VERSIONS) { #ifdef DATA_DEBUG - std::cerr << "RsGxsDataAccess::getMsgRelatedList() MSG_VERSIONS"; - std::cerr << std::endl; + RsDbg() << "RsGxsDataAccess::getMsgRelatedList() MSG_VERSIONS" << std::endl; #endif onlyAllVersions = true; } @@ -1281,8 +1228,7 @@ bool RsGxsDataAccess::getMsgRelatedInfo(MsgRelatedInfoReq *req) if (opts.mOptions & RS_TOKREQOPT_MSG_PARENT) { #ifdef DATA_DEBUG - std::cerr << "RsGxsDataAccess::getMsgRelatedList() MSG_PARENTS"; - std::cerr << std::endl; + RsDbg() << "RsGxsDataAccess::getMsgRelatedList() MSG_PARENTS" << std::endl; #endif onlyChildMsgs = true; } @@ -1290,8 +1236,7 @@ bool RsGxsDataAccess::getMsgRelatedInfo(MsgRelatedInfoReq *req) if (opts.mOptions & RS_TOKREQOPT_MSG_THREAD) { #ifdef DATA_DEBUG - std::cerr << "RsGxsDataAccess::getMsgRelatedList() MSG_THREAD"; - std::cerr << std::endl; + RsDbg() << "RsGxsDataAccess::getMsgRelatedList() MSG_THREAD" << std::endl; #endif onlyThreadMsgs = true; } @@ -1299,8 +1244,7 @@ bool RsGxsDataAccess::getMsgRelatedInfo(MsgRelatedInfoReq *req) if (onlyAllVersions && onlyChildMsgs) { #ifdef DATA_DEBUG - std::cerr << "RsGxsDataAccess::getMsgRelatedList() ERROR Incompatible FLAGS (VERSIONS & PARENT)"; - std::cerr << std::endl; + RsDbg() << "RsGxsDataAccess::getMsgRelatedList() ERROR Incompatible FLAGS (VERSIONS & PARENT)" << std::endl; #endif return false; @@ -1309,8 +1253,7 @@ bool RsGxsDataAccess::getMsgRelatedInfo(MsgRelatedInfoReq *req) if (onlyAllVersions && onlyThreadMsgs) { #ifdef DATA_DEBUG - std::cerr << "RsGxsDataAccess::getMsgRelatedList() ERROR Incompatible FLAGS (VERSIONS & THREAD)"; - std::cerr << std::endl; + RsDbg() << "RsGxsDataAccess::getMsgRelatedList() ERROR Incompatible FLAGS (VERSIONS & THREAD)" << std::endl; #endif return false; @@ -1319,8 +1262,7 @@ bool RsGxsDataAccess::getMsgRelatedInfo(MsgRelatedInfoReq *req) if ((!onlyLatestMsgs) && onlyChildMsgs) { #ifdef DATA_DEBUG - std::cerr << "RsGxsDataAccess::getMsgRelatedList() ERROR Incompatible FLAGS (!LATEST & PARENT)"; - std::cerr << std::endl; + RsDbg() << "RsGxsDataAccess::getMsgRelatedList() ERROR Incompatible FLAGS (!LATEST & PARENT)" << std::endl; #endif return false; @@ -1329,8 +1271,7 @@ bool RsGxsDataAccess::getMsgRelatedInfo(MsgRelatedInfoReq *req) if ((!onlyLatestMsgs) && onlyThreadMsgs) { #ifdef DATA_DEBUG - std::cerr << "RsGxsDataAccess::getMsgRelatedList() ERROR Incompatible FLAGS (!LATEST & THREAD)"; - std::cerr << std::endl; + RsDbg() << "RsGxsDataAccess::getMsgRelatedList() ERROR Incompatible FLAGS (!LATEST & THREAD)" << std::endl; #endif return false; @@ -1339,8 +1280,7 @@ bool RsGxsDataAccess::getMsgRelatedInfo(MsgRelatedInfoReq *req) if (onlyChildMsgs && onlyThreadMsgs) { #ifdef DATA_DEBUG - std::cerr << "RsGxsDataAccess::getMsgRelatedList() ERROR Incompatible FLAGS (PARENT & THREAD)"; - std::cerr << std::endl; + RsDbg() << "RsGxsDataAccess::getMsgRelatedList() ERROR Incompatible FLAGS (PARENT & THREAD)" << std::endl; #endif return false; @@ -1351,8 +1291,7 @@ bool RsGxsDataAccess::getMsgRelatedInfo(MsgRelatedInfoReq *req) if ((!onlyLatestMsgs) && (!onlyAllVersions) && (!onlyChildMsgs) && (!onlyThreadMsgs)) { #ifdef DATA_DEBUG - std::cerr << "RsGxsDataAccess::getMsgRelatedList() FALLBACK -> NO FLAGS -> SIMPLY RETURN nothing"; - std::cerr << std::endl; + RsDbg() << "RsGxsDataAccess::getMsgRelatedList() FALLBACK -> NO FLAGS -> SIMPLY RETURN nothing" << std::endl; #endif return true; @@ -1381,7 +1320,7 @@ bool RsGxsDataAccess::getMsgRelatedInfo(MsgRelatedInfoReq *req) std::set outMsgIds; - RsGxsMsgMetaData* origMeta = NULL; + RsGxsMsgMetaData* origMeta = nullptr; for(vit_meta = metaV.begin(); vit_meta != metaV.end(); ++vit_meta) { RsGxsMsgMetaData* meta = *vit_meta; @@ -1396,7 +1335,7 @@ bool RsGxsDataAccess::getMsgRelatedInfo(MsgRelatedInfoReq *req) if(!origMeta) { #ifdef DATA_DEBUG - std::cerr << "RsGxsDataAccess::getMsgRelatedInfo(): Cannot find meta of msgId (to relate to)!" + RsDbg() << "RsGxsDataAccess::getMsgRelatedInfo(): Cannot find meta of msgId (to relate to)!" << std::endl; #endif cleanseMsgMetaMap(result); @@ -1441,11 +1380,11 @@ bool RsGxsDataAccess::getMsgRelatedInfo(MsgRelatedInfoReq *req) if (oit == origMsgTs.end()) { #ifdef DATA_DEBUG - std::cerr << "RsGxsDataAccess::getMsgRelatedList() Found New OrigMsgId: "; - std::cerr << meta->mOrigMsgId; - std::cerr << " MsgId: " << meta->mMsgId; - std::cerr << " TS: " << meta->mPublishTs; - std::cerr << std::endl; + RsDbg() << "RsGxsDataAccess::getMsgRelatedList() Found New OrigMsgId: " + << meta->mOrigMsgId + << " MsgId: " << meta->mMsgId + << " TS: " << meta->mPublishTs + << std::endl; #endif addMsg = true; @@ -1454,10 +1393,11 @@ bool RsGxsDataAccess::getMsgRelatedInfo(MsgRelatedInfoReq *req) else if (oit->second.second < meta->mPublishTs) { #ifdef DATA_DEBUG - std::cerr << "RsGxsDataAccess::getMsgRelatedList() Found Later Msg. OrigMsgId: "; - std::cerr << meta->mOrigMsgId; - std::cerr << " MsgId: " << meta->mMsgId; - std::cerr << " TS: " << meta->mPublishTs; + RsDbg() << "RsGxsDataAccess::getMsgRelatedList() Found Later Msg. OrigMsgId: " + << meta->mOrigMsgId + << " MsgId: " << meta->mMsgId + << " TS: " << meta->mPublishTs + << std::endl; #endif addMsg = true; @@ -1483,7 +1423,7 @@ bool RsGxsDataAccess::getMsgRelatedInfo(MsgRelatedInfoReq *req) /* first guess is potentially better than Orig (can't be worse!) */ rstime_t latestTs = 0; RsGxsMessageId latestMsgId; - RsGxsMsgMetaData* latestMeta=NULL; + RsGxsMsgMetaData* latestMeta=nullptr; for(vit_meta = metaV.begin(); vit_meta != metaV.end(); ++vit_meta) { @@ -1519,7 +1459,7 @@ bool RsGxsDataAccess::getMsgRelatedInfo(MsgRelatedInfoReq *req) GxsMsgIdResult filteredOutMsgIds; filteredOutMsgIds[grpId] = outMsgIds; - filterMsgList(filteredOutMsgIds, opts, filterMap); + filterMsgIdList(filteredOutMsgIds, opts, filterMap); if(!filteredOutMsgIds[grpId].empty()) { @@ -1557,7 +1497,7 @@ bool RsGxsDataAccess::getGroupStatistic(GroupStatisticRequest *req) GxsMsgMetaResult metaResult; mDataStore->retrieveGxsMsgMetaData(metaReq, metaResult); - std::vector& msgMetaV = metaResult[req->mGrpId]; + const std::vector& msgMetaV = metaResult[req->mGrpId]; req->mGroupStatistic.mGrpId = req->mGrpId; req->mGroupStatistic.mNumMsgs = msgMetaV.size(); @@ -1649,7 +1589,7 @@ bool RsGxsDataAccess::getServiceStatistic(ServiceStatisticRequest *req) return true; } -bool RsGxsDataAccess::getMsgList(MsgIdReq* req) +bool RsGxsDataAccess::getMsgIdList(MsgIdReq* req) { GxsMsgMetaResult result; @@ -1677,7 +1617,7 @@ bool RsGxsDataAccess::getMsgList(MsgIdReq* req) GxsMsgReq msgIdOut; // filter based on options - getMsgList(req->mMsgIdResult, req->Options, msgIdOut); + getMsgIdList(req->mMsgIdResult, req->Options, msgIdOut); req->mMsgIdResult = msgIdOut; return true; @@ -1702,12 +1642,9 @@ void RsGxsDataAccess::cleanseMsgMetaMap(GxsMsgMetaResult& result) return; } -void RsGxsDataAccess::filterMsgList( - GxsMsgIdResult& resultsMap, const RsTokReqOptions& opts, - const MsgMetaFilter& msgMetas ) const +void RsGxsDataAccess::filterMsgIdList( GxsMsgIdResult& resultsMap, const RsTokReqOptions& opts, const MsgMetaFilter& msgMetas ) const { - for( GxsMsgIdResult::iterator grpIt = resultsMap.begin(); - grpIt != resultsMap.end(); ++grpIt ) + for( GxsMsgIdResult::iterator grpIt = resultsMap.begin(); grpIt != resultsMap.end(); ++grpIt ) { const RsGxsGroupId& groupId(grpIt->first); std::set& msgsIdSet(grpIt->second); @@ -1715,13 +1652,12 @@ void RsGxsDataAccess::filterMsgList( MsgMetaFilter::const_iterator cit = msgMetas.find(groupId); if(cit == msgMetas.end()) continue; #ifdef DATA_DEBUG - std::cerr << __PRETTY_FUNCTION__ << " " << msgsIdSet.size() + RsDbg() << __PRETTY_FUNCTION__ << " " << msgsIdSet.size() << " for group: " << groupId << " before filtering" << std::endl; #endif - for( std::set::iterator msgIdIt = msgsIdSet.begin(); - msgIdIt != msgsIdSet.end(); ) + for( std::set::iterator msgIdIt = msgsIdSet.begin(); msgIdIt != msgsIdSet.end(); ) { const RsGxsMessageId& msgId(*msgIdIt); const std::map& msgsMetaMap = @@ -1735,12 +1671,14 @@ void RsGxsDataAccess::filterMsgList( keep = checkMsgFilter(opts, msgsMetaMapIt->second); } - if(keep) ++msgIdIt; - else msgIdIt = msgsIdSet.erase(msgIdIt); + if(keep) + ++msgIdIt; + else + msgIdIt = msgsIdSet.erase(msgIdIt); } #ifdef DATA_DEBUG - std::cerr << __PRETTY_FUNCTION__ << " " << msgsIdSet.size() + RsDbg() << __PRETTY_FUNCTION__ << " " << msgsIdSet.size() << " for group: " << groupId << " after filtering" << std::endl; #endif @@ -1774,23 +1712,44 @@ void RsGxsDataAccess::filterGrpList(std::list &grpIds, const RsTok } -bool RsGxsDataAccess::checkRequestStatus( - uint32_t token, GxsRequestStatus& status, uint32_t& reqtype, - uint32_t& anstype, rstime_t& ts ) +bool RsGxsDataAccess::checkRequestStatus( uint32_t token, GxsRequestStatus& status, uint32_t& reqtype, uint32_t& anstype, rstime_t& ts ) { RS_STACK_MUTEX(mDataMutex); - GxsRequest* req = locked_retrieveRequest(token); + GxsRequest* req = locked_retrieveCompetedRequest(token); - if (req == NULL || req->status == CANCELLED) - return false; +#ifdef DATA_DEBUG + RsDbg() << "CheckRequestStatus: token=" << token ; +#endif - anstype = req->ansType; - reqtype = req->reqType; - status = req->status; - ts = req->reqTime; + if(req != nullptr) + { + anstype = req->clientAnswerType; + reqtype = req->reqType; + status = COMPLETE; + ts = req->reqTime; +#ifdef DATA_DEBUG + RsDbg() << __PRETTY_FUNCTION__ << " Returning status = COMPLETE" << std::endl; +#endif + return true; + } - return true; + auto it = mPublicToken.find(token); + + if(it != mPublicToken.end()) + { + status = it->second; +#ifdef DATA_DEBUG + RsDbg() << __PRETTY_FUNCTION__ << " Returning status = " << status << std::endl; +#endif + return true; + } + + status = FAILED; +#ifdef DATA_DEBUG + RsDbg() << " Token not found. Returning FAILED" << std::endl; +#endif + return false; } bool RsGxsDataAccess::addGroupData(RsNxsGrp* grp) { @@ -1817,7 +1776,7 @@ bool RsGxsDataAccess::getGroupData(const RsGxsGroupId& grpId, RsNxsGrp *& grp_da std::map grps ; - grps[grpId] = NULL ; + grps[grpId] = nullptr ; if(mDataStore->retrieveNxsGrps(grps, false, true)) // the false here is very important: it removes the private key parts. { @@ -1844,18 +1803,16 @@ void RsGxsDataAccess::tokenList(std::list& tokens) RsStackMutex stack(mDataMutex); - std::map::iterator mit = mRequests.begin(); + for(auto& it:mRequestQueue) + tokens.push_back(it.second->token); - for(; mit != mRequests.end(); ++mit) - { - tokens.push_back(mit->first); - } + for(auto& it:mCompletedRequests) + tokens.push_back(it.first); } -bool RsGxsDataAccess::locked_updateRequestStatus( - uint32_t token, RsTokenService::GxsRequestStatus status ) +bool RsGxsDataAccess::locked_updateRequestStatus( uint32_t token, RsTokenService::GxsRequestStatus status ) { - GxsRequest* req = locked_retrieveRequest(token); + GxsRequest* req = locked_retrieveCompetedRequest(token); if(req) req->status = status; else return false; @@ -1870,7 +1827,7 @@ uint32_t RsGxsDataAccess::generatePublicToken() { RS_STACK_MUTEX(mDataMutex); - mPublicToken[token] = RsTokenService::PENDING; + mPublicToken[token] = PENDING ; } return token; @@ -1878,15 +1835,19 @@ uint32_t RsGxsDataAccess::generatePublicToken() -bool RsGxsDataAccess::updatePublicRequestStatus( - uint32_t token, RsTokenService::GxsRequestStatus status ) +bool RsGxsDataAccess::updatePublicRequestStatus( uint32_t token, RsTokenService::GxsRequestStatus status ) { RS_STACK_MUTEX(mDataMutex); - std::map::iterator mit = - mPublicToken.find(token); - if(mit != mPublicToken.end()) mit->second = status; - else return false; - return true; + + auto mit = mPublicToken.find(token); + + if(mit != mPublicToken.end()) + { + mit->second = status; + return true; + } + else + return false; } @@ -1894,11 +1855,14 @@ bool RsGxsDataAccess::updatePublicRequestStatus( bool RsGxsDataAccess::disposeOfPublicToken(uint32_t token) { RS_STACK_MUTEX(mDataMutex); - std::map::iterator mit = - mPublicToken.find(token); - if(mit != mPublicToken.end()) mPublicToken.erase(mit); - else return false; - return true; + auto mit = mPublicToken.find(token); + if(mit != mPublicToken.end()) + { + mPublicToken.erase(mit); + return true; + } + else + return false; } bool RsGxsDataAccess::checkGrpFilter(const RsTokReqOptions &opts, const RsGxsGrpMetaData *meta) const @@ -1931,7 +1895,7 @@ bool RsGxsDataAccess::checkMsgFilter( (opts.mStatusMask & meta->mMsgStatus) ) { #ifdef DATA_DEBUG - std::cerr << __PRETTY_FUNCTION__ + RsDbg() << __PRETTY_FUNCTION__ << " Continue checking Msg as StatusMatches: " << " Mask: " << opts.mStatusMask << " StatusFilter: " << opts.mStatusFilter @@ -1942,7 +1906,7 @@ bool RsGxsDataAccess::checkMsgFilter( else { #ifdef DATA_DEBUG - std::cerr << __PRETTY_FUNCTION__ + RsDbg() << __PRETTY_FUNCTION__ << " Dropping Msg due to !StatusMatch " << " Mask: " << opts.mStatusMask << " StatusFilter: " << opts.mStatusFilter @@ -1956,7 +1920,7 @@ bool RsGxsDataAccess::checkMsgFilter( else { #ifdef DATA_DEBUG - std::cerr << __PRETTY_FUNCTION__ + RsDbg() << __PRETTY_FUNCTION__ << " Status check not requested" << " mStatusMask: " << opts.mStatusMask << " MsgId: " << meta->mMsgId << std::endl; @@ -1970,7 +1934,7 @@ bool RsGxsDataAccess::checkMsgFilter( (opts.mMsgFlagMask & meta->mMsgFlags) ) { #ifdef DATA_DEBUG - std::cerr << __PRETTY_FUNCTION__ + RsDbg() << __PRETTY_FUNCTION__ << " Accepting Msg as FlagMatches: " << " Mask: " << opts.mMsgFlagMask << " FlagFilter: " << opts.mMsgFlagFilter @@ -1981,7 +1945,7 @@ bool RsGxsDataAccess::checkMsgFilter( else { #ifdef DATA_DEBUG - std::cerr << __PRETTY_FUNCTION__ + RsDbg() << __PRETTY_FUNCTION__ << " Dropping Msg due to !FlagMatch " << " Mask: " << opts.mMsgFlagMask << " FlagFilter: " << opts.mMsgFlagFilter @@ -1995,7 +1959,7 @@ bool RsGxsDataAccess::checkMsgFilter( else { #ifdef DATA_DEBUG - std::cerr << __PRETTY_FUNCTION__ + RsDbg() << __PRETTY_FUNCTION__ << " Flags check not requested" << " mMsgFlagMask: " << opts.mMsgFlagMask << " MsgId: " << meta->mMsgId << std::endl; diff --git a/libretroshare/src/gxs/rsgxsdataaccess.h b/libretroshare/src/gxs/rsgxsdataaccess.h index c5bfdf084..21cb89b24 100644 --- a/libretroshare/src/gxs/rsgxsdataaccess.h +++ b/libretroshare/src/gxs/rsgxsdataaccess.h @@ -22,6 +22,7 @@ #ifndef RSGXSDATAACCESS_H #define RSGXSDATAACCESS_H +#include #include "retroshare/rstokenservice.h" #include "rsgxsrequesttypes.h" #include "rsgds.h" @@ -30,6 +31,8 @@ typedef std::map< RsGxsGroupId, std::map > MsgMetaFilter; typedef std::map< RsGxsGroupId, RsGxsGrpMetaData* > GrpMetaFilter; +bool operator<(const std::pair& p1,const std::pair& p2); + class RsGxsDataAccess : public RsTokenService { public: @@ -56,7 +59,7 @@ public: * @param groupIds group id to request info for * @return */ - bool requestGroupInfo(uint32_t &token, uint32_t ansType, const RsTokReqOptions &opts, const std::list &groupIds); + bool requestGroupInfo(uint32_t &token, uint32_t ansType, const RsTokReqOptions &opts, const std::list &groupIds) override; /*! * Use this to request all group related info @@ -65,7 +68,7 @@ public: * @param opts Additional option that affect outcome of request. Please see specific services, for valid values * @return */ - bool requestGroupInfo(uint32_t &token, uint32_t ansType, const RsTokReqOptions &opts); + bool requestGroupInfo(uint32_t &token, uint32_t ansType, const RsTokReqOptions &opts) override; /*! * Use this to get msg information (id, meta, or data), store token value to poll for request completion @@ -75,7 +78,7 @@ public: * @param groupIds The ids of the groups to get, second entry of map empty to query for all msgs * @return true if request successful false otherwise */ - bool requestMsgInfo(uint32_t &token, uint32_t ansType, const RsTokReqOptions &opts, const GxsMsgReq& msgIds); + bool requestMsgInfo(uint32_t &token, uint32_t ansType, const RsTokReqOptions &opts, const GxsMsgReq& msgIds) override; /*! * Use this to get message information (id, meta, or data), store token value to poll for request completion @@ -86,7 +89,7 @@ public: * all messages for all groups are retrieved * @return true if request successful false otherwise */ - bool requestMsgInfo(uint32_t &token, uint32_t ansType, const RsTokReqOptions &opts, const std::list& grpIds); + bool requestMsgInfo(uint32_t &token, uint32_t ansType, const RsTokReqOptions &opts, const std::list& grpIds) override; /*! * For requesting msgs related to a given msg id within a group @@ -96,7 +99,7 @@ public: * @param groupIds The ids of the groups to get, second entry of map empty to query for all msgs * @return true if request successful false otherwise */ - bool requestMsgRelatedInfo(uint32_t &token, uint32_t ansType, const RsTokReqOptions &opts, const std::vector &msgIds); + bool requestMsgRelatedInfo(uint32_t &token, uint32_t ansType, const RsTokReqOptions &opts, const std::vector &msgIds) override; /*! * This request statistics on amount of data held @@ -107,19 +110,20 @@ public: * total size of messages * total size of groups * @param token + * @param opts Additional option that affect outcome of request. Please see specific services, for valid values */ - void requestServiceStatistic(uint32_t& token); + void requestServiceStatistic(uint32_t& token, const RsTokReqOptions &opts) override; /*! * To request statistic on a group * @param token set to value to be redeemed to get statistic * @param grpId the id of the group + * @param opts Additional option that affect outcome of request. Please see specific services, for valid values */ - void requestGroupStatistic(uint32_t& token, const RsGxsGroupId& grpId); - + void requestGroupStatistic(uint32_t& token, const RsGxsGroupId& grpId, const RsTokReqOptions &opts) override; /* Poll */ - GxsRequestStatus requestStatus(const uint32_t token); + GxsRequestStatus requestStatus(uint32_t token); /* Cancel Request */ bool cancelRequest(const uint32_t &token); @@ -200,7 +204,8 @@ public: * @param token request token to be redeemed * @param msgIds */ - bool getMsgList(const uint32_t &token, GxsMsgIdResult &msgIds); + bool getMsgIdList(const uint32_t &token, GxsMsgIdResult &msgIds); + /*! * Retrieve msg list for a given token for message related info @@ -271,7 +276,7 @@ private: * @param token the value of the token for the request object handle wanted * @return the request associated to this token */ - GxsRequest* locked_retrieveRequest(const uint32_t& token); + GxsRequest* locked_retrieveCompetedRequest(const uint32_t& token); /*! * Add a gxs request to queue @@ -378,8 +383,18 @@ private: * @param req * @return false if unsuccessful, true otherwise */ - bool getMsgList(MsgIdReq* req); + bool getMsgIdList(MsgIdReq* req); + /*! + * Attempts to retrieve msg Meta list from data store + * Computationally/CPU-Bandwidth expensive + * + * @param msgIds List of message Ids for the Message Metas to retrieve + * @param opts GxsRequest options + * @param result Map of Meta information for messages + * + */ + bool getMsgMetaDataList( const GxsMsgReq& msgIds, const RsTokReqOptions& opts, GxsMsgMetaResult& result ); /*! * Attempts to retrieve group meta data from data store @@ -445,7 +460,7 @@ private: * @param opts the request options set by user * @param meta The accompanying meta information for msg, ids */ - void filterMsgList(GxsMsgIdResult& msgIds, const RsTokReqOptions& opts, const MsgMetaFilter& meta) const; + void filterMsgIdList(GxsMsgIdResult& msgIds, const RsTokReqOptions& opts, const MsgMetaFilter& meta) const; /*! * This filter msgs based of options supplied (at the moment just status masks) @@ -482,9 +497,10 @@ private: * @param opts the options used to parameterise the id filter * @param msgIdsOut the left overs ids after filter is applied to msgIds */ - bool getMsgList(const GxsMsgReq& msgIds, const RsTokReqOptions& opts, GxsMsgReq& msgIdsOut); + bool getMsgIdList(const GxsMsgReq& msgIds, const RsTokReqOptions& opts, GxsMsgReq& msgIdsOut); private: + bool locked_clearRequest(const uint32_t &token); RsGeneralDataService* mDataStore; @@ -492,10 +508,9 @@ private: uint32_t mNextToken; std::map mPublicToken; - std::map mRequests; - - + std::set > mRequestQueue; + std::map mCompletedRequests; }; #endif // RSGXSDATAACCESS_H diff --git a/libretroshare/src/gxs/rsgxsrequesttypes.cc b/libretroshare/src/gxs/rsgxsrequesttypes.cc index d57b4d353..6c3925974 100644 --- a/libretroshare/src/gxs/rsgxsrequesttypes.cc +++ b/libretroshare/src/gxs/rsgxsrequesttypes.cc @@ -23,6 +23,128 @@ #include "rsgxsrequesttypes.h" #include "util/rsstd.h" +std::ostream& operator<<(std::ostream& o,const GxsRequest& g) +{ + return g.print(o); +} + + +std::ostream& GroupMetaReq::print(std::ostream& o) const +{ + o << "[Request type=GroupMeta groupIds (size=" << mGroupIds.size() << "): " ; + + if(!mGroupIds.empty()) + { + o << *mGroupIds.begin() ; + + if(mGroupIds.size() > 1) + o << " ..." ; + } + + o << "]" ; + + return o; +} +std::ostream& GroupIdReq::print(std::ostream& o) const +{ + return o << "[Request type=GroupIdReq" << "]" ; +} + +std::ostream& GroupSerializedDataReq::print(std::ostream& o) const +{ + return o << "[Request type=GroupSerializedData" << "]" ; +} + +std::ostream& GroupDataReq::print(std::ostream& o) const +{ + o << "[Request type=GroupDataReq groupIds (size=" << mGroupIds.size() << "): " ; + + if(!mGroupIds.empty()) + { + o << *mGroupIds.begin() ; + + if(mGroupIds.size() > 1) + o << " ..." ; + } + + o << "]" ; + + return o; +} + +std::ostream& MsgIdReq::print(std::ostream& o) const +{ + return o << "[Request type=MsgId" << "]" ; +} + +std::ostream& MsgMetaReq::print(std::ostream& o) const +{ + o << "[Request type=MsgMetaReq groups (size=" << mMsgIds.size() << "): " ; + + if(!mMsgIds.empty()) + { + o << mMsgIds.begin()->first << " (" << mMsgIds.begin()->second.size() << " messages)"; + + if(mMsgIds.size() > 1) + o << " ..." ; + } + + o << "]" ; + + return o; +} + +std::ostream& MsgDataReq::print(std::ostream& o) const +{ + o << "[Request type=MsgDataReq groups (size=" << mMsgIds.size() << "): " ; + + if(!mMsgIds.empty()) + { + o << mMsgIds.begin()->first << " (" << mMsgIds.begin()->second.size() << " messages)"; + + if(mMsgIds.size() > 1) + o << " ..." ; + } + + o << "]" ; + + return o; +} + +std::ostream& MsgRelatedInfoReq::print(std::ostream& o) const +{ + o << "[Request type=MsgRelatedInfo msgIds (size=" << mMsgIds.size() << "): " ; + + if(!mMsgIds.empty()) + { + o << mMsgIds.begin()->first ; + + if(mMsgIds.size() > 1) + o << " ..." ; + } + + o << "]" ; + + return o; +} + +std::ostream& GroupSetFlagReq::print(std::ostream& o) const +{ + return o << "[Request type=GroupFlagSet grpId=" << grpId << "]" ; +} + + + +std::ostream& ServiceStatisticRequest::print(std::ostream& o) const +{ + return o << "[Request type=ServiceStatistics" << "]" ; +} + +std::ostream& GroupStatisticRequest::print(std::ostream& o) const +{ + return o << "[Request type=GroupStatistics grpId=" << mGrpId << "]" ; +} + GroupMetaReq::~GroupMetaReq() { //rsstd::delete_all(mGroupMetaData.begin(), mGroupMetaData.end()); // now memory ownership is kept by the cache. @@ -57,3 +179,8 @@ MsgRelatedInfoReq::~MsgRelatedInfoReq() rsstd::delete_all(dataIt->second.begin(), dataIt->second.end()); } } +std::ostream& MessageSetFlagReq::print(std::ostream& o) const +{ + return o << "[Request type=MsgFlagSet" << "]" ; +} + diff --git a/libretroshare/src/gxs/rsgxsrequesttypes.h b/libretroshare/src/gxs/rsgxsrequesttypes.h index 7af50b135..e738a1923 100644 --- a/libretroshare/src/gxs/rsgxsrequesttypes.h +++ b/libretroshare/src/gxs/rsgxsrequesttypes.h @@ -29,25 +29,30 @@ struct GxsRequest { GxsRequest() : - token(0), reqTime(0), ansType(0), reqType(0), + token(0), reqTime(0), clientAnswerType(0), reqType(0), status(RsTokenService::FAILED) {} virtual ~GxsRequest() {} uint32_t token; uint32_t reqTime; - RS_DEPRECATED uint32_t ansType; /// G10h4ck: This is of no use + uint32_t clientAnswerType; /// This is made available to the clients in order to keep track of why specific requests where sent.. uint32_t reqType; RsTokReqOptions Options; RsTokenService::GxsRequestStatus status; + + virtual std::ostream& print(std::ostream& o) const = 0; }; +std::ostream& operator<<(std::ostream& o,const GxsRequest& g); + class GroupMetaReq : public GxsRequest { public: virtual ~GroupMetaReq(); + virtual std::ostream& print(std::ostream& o) const override; public: std::list mGroupIds; std::list mGroupMetaData; @@ -56,12 +61,16 @@ public: class GroupIdReq : public GxsRequest { public: + virtual std::ostream& print(std::ostream& o) const override ; + std::list mGroupIds; std::list mGroupIdResult; }; class GroupSerializedDataReq : public GxsRequest { public: + virtual std::ostream& print(std::ostream& o) const override ; + std::list mGroupIds; std::list mGroupData; }; @@ -71,6 +80,7 @@ class GroupDataReq : public GxsRequest public: virtual ~GroupDataReq(); + virtual std::ostream& print(std::ostream& o) const override; public: std::list mGroupIds; std::list mGroupData; @@ -79,6 +89,8 @@ public: class MsgIdReq : public GxsRequest { public: + virtual std::ostream& print(std::ostream& o) const override ; + GxsMsgReq mMsgIds; GxsMsgIdResult mMsgIdResult; }; @@ -88,6 +100,8 @@ class MsgMetaReq : public GxsRequest public: virtual ~MsgMetaReq(); + virtual std::ostream& print(std::ostream& o) const override; + public: GxsMsgReq mMsgIds; GxsMsgMetaResult mMsgMetaData; @@ -98,6 +112,7 @@ class MsgDataReq : public GxsRequest public: virtual ~MsgDataReq(); + virtual std::ostream& print(std::ostream& o) const override; public: GxsMsgReq mMsgIds; NxsMsgDataResult mMsgData; @@ -106,12 +121,15 @@ public: class ServiceStatisticRequest: public GxsRequest { public: + virtual std::ostream& print(std::ostream& o) const override ; GxsServiceStatistic mServiceStatistic; }; struct GroupStatisticRequest: public GxsRequest { public: + virtual std::ostream& print(std::ostream& o) const override ; + RsGxsGroupId mGrpId; GxsGroupStatistic mGroupStatistic; }; @@ -121,6 +139,7 @@ class MsgRelatedInfoReq : public GxsRequest public: virtual ~MsgRelatedInfoReq(); + std::ostream& print(std::ostream& o) const override; public: std::vector mMsgIds; MsgRelatedIdResult mMsgIdResult; @@ -131,6 +150,8 @@ public: class GroupSetFlagReq : public GxsRequest { public: + virtual std::ostream& print(std::ostream& o) const override ; + const static uint32_t FLAG_SUBSCRIBE; const static uint32_t FLAG_STATUS; @@ -145,6 +166,7 @@ class MessageSetFlagReq : public GxsRequest public: const static uint32_t FLAG_STATUS; + virtual std::ostream& print(std::ostream& o) const override ; uint8_t type; uint32_t flag; uint32_t flagMask; diff --git a/libretroshare/src/retroshare/rsgxschannels.h b/libretroshare/src/retroshare/rsgxschannels.h index 83f21a5e9..9cc4bd33e 100644 --- a/libretroshare/src/retroshare/rsgxschannels.h +++ b/libretroshare/src/retroshare/rsgxschannels.h @@ -111,6 +111,7 @@ enum class RsChannelEventCode: uint8_t SUBSCRIBE_STATUS_CHANGED = 0x06, // subscription for channel mChannelGroupId changed. READ_STATUS_CHANGED = 0x07, // existing message has been read or set to unread RECEIVED_DISTANT_SEARCH_RESULT = 0x08, // result for the given group id available for the given turtle request id + STATISTICS_CHANGED = 0x09, // stats (nb of supplier friends, how many msgs they have etc) has changed }; struct RsGxsChannelEvent: RsEvent diff --git a/libretroshare/src/retroshare/rsgxsforums.h b/libretroshare/src/retroshare/rsgxsforums.h index 5bff8e267..11a47cf7b 100644 --- a/libretroshare/src/retroshare/rsgxsforums.h +++ b/libretroshare/src/retroshare/rsgxsforums.h @@ -111,6 +111,7 @@ enum class RsForumEventCode: uint8_t UPDATED_MESSAGE = 0x04, /// existing message has been updated in a particular forum SUBSCRIBE_STATUS_CHANGED = 0x05, /// forum was subscribed or unsubscribed READ_STATUS_CHANGED = 0x06, /// msg was read or marked unread + STATISTICS_CHANGED = 0x07, /// suppliers and how many messages they have changed }; struct RsGxsForumEvent: RsEvent diff --git a/libretroshare/src/retroshare/rsgxsifacehelper.h b/libretroshare/src/retroshare/rsgxsifacehelper.h index ac8185b27..e2d04f887 100644 --- a/libretroshare/src/retroshare/rsgxsifacehelper.h +++ b/libretroshare/src/retroshare/rsgxsifacehelper.h @@ -40,7 +40,9 @@ * are necessary, so at this point this workaround seems acceptable. */ -#define DEBUG_GXSIFACEHELPER 1 +//================================== +// #define DEBUG_GXSIFACEHELPER 1 +//================================== enum class TokenRequestType: uint8_t { @@ -267,7 +269,9 @@ public: { RS_STACK_MUTEX(mMtx); mActiveTokens[token]=high_priority_request? (TokenRequestType::NO_KILL_TYPE) : token_request_type; +#ifdef DEBUG_GXSIFACEHELPER locked_dumpTokens(); +#endif return true; } else @@ -295,7 +299,9 @@ public: { RS_STACK_MUTEX(mMtx); mActiveTokens[token]=high_priority_request? (TokenRequestType::NO_KILL_TYPE) : token_request_type; +#ifdef DEBUG_GXSIFACEHELPER locked_dumpTokens(); +#endif return true; } else @@ -310,7 +316,9 @@ public: RS_STACK_MUTEX(mMtx); mActiveTokens[token]= (msgIds.size()==1 && msgIds.begin()->second.size()==0) ?(TokenRequestType::ALL_POSTS):(TokenRequestType::POSTS); +#ifdef DEBUG_GXSIFACEHELPER locked_dumpTokens(); +#endif return true; } else @@ -324,7 +332,9 @@ public: { RS_STACK_MUTEX(mMtx); mActiveTokens[token]=TokenRequestType::ALL_POSTS; +#ifdef DEBUG_GXSIFACEHELPER locked_dumpTokens(); +#endif return true; } else @@ -340,7 +350,9 @@ public: { RS_STACK_MUTEX(mMtx); mActiveTokens[token]=TokenRequestType::MSG_RELATED_INFO; +#ifdef DEBUG_GXSIFACEHELPER locked_dumpTokens(); +#endif return true; } else @@ -357,23 +369,33 @@ public: /// @see RsTokenService::requestServiceStatistic bool requestServiceStatistic(uint32_t& token) { - mTokenService.requestServiceStatistic(token); + RsTokReqOptions opts; + opts.mReqType = GXS_REQUEST_TYPE_SERVICE_STATS; + + mTokenService.requestServiceStatistic(token,opts); RS_STACK_MUTEX(mMtx); mActiveTokens[token]=TokenRequestType::SERVICE_STATISTICS; +#ifdef DEBUG_GXSIFACEHELPER locked_dumpTokens(); +#endif return true; } /// @see RsTokenService::requestGroupStatistic bool requestGroupStatistic(uint32_t& token, const RsGxsGroupId& grpId) { - mTokenService.requestGroupStatistic(token, grpId); + RsTokReqOptions opts; + opts.mReqType = GXS_REQUEST_TYPE_GROUP_STATS; + + mTokenService.requestGroupStatistic(token, grpId,opts); RS_STACK_MUTEX(mMtx); mActiveTokens[token]=TokenRequestType::GROUP_STATISTICS; +#ifdef DEBUG_GXSIFACEHELPER locked_dumpTokens(); +#endif return true; } @@ -485,7 +507,7 @@ private: uint32_t count[7] = {0}; - std::cerr << "Service " << std::hex << service_id << std::dec + RsDbg() << "Service " << std::hex << service_id << std::dec << " (" << rsServiceControl->getServiceName(RsServiceInfo::RsServiceInfoUIn16ToFullServiceId(service_id)) << ") this=" << std::hex << (void*)this << std::dec << ") Active tokens (per type): " ; diff --git a/libretroshare/src/retroshare/rsgxsifacetypes.h b/libretroshare/src/retroshare/rsgxsifacetypes.h index 60111d0cc..90715dfcb 100644 --- a/libretroshare/src/retroshare/rsgxsifacetypes.h +++ b/libretroshare/src/retroshare/rsgxsifacetypes.h @@ -45,6 +45,13 @@ struct RsMsgMetaData; typedef std::map > MsgMetaResult; +enum class GxsRequestPriority { + VERY_HIGH = 0x00, + HIGH = 0x01, + NORMAL = 0x02, + LOW = 0x03, + VERY_LOW = 0x04, +}; class RsGxsGrpMetaData; class RsGxsMsgMetaData; @@ -232,7 +239,7 @@ public: mNumChildMsgsNew = 0; mNumChildMsgsUnread = 0; mSizeStore = 0; - } + } public: uint32_t mNumMsgs; diff --git a/libretroshare/src/retroshare/rsgxsservice.h b/libretroshare/src/retroshare/rsgxsservice.h index cb4fcf7eb..4739b0ec2 100644 --- a/libretroshare/src/retroshare/rsgxsservice.h +++ b/libretroshare/src/retroshare/rsgxsservice.h @@ -46,7 +46,8 @@ struct RsGxsNotify TYPE_RECEIVED_NEW = 0x02, TYPE_PROCESSED = 0x03, TYPE_RECEIVED_PUBLISHKEY = 0x04, - TYPE_RECEIVED_DISTANT_SEARCH_RESULTS = 0x05 + TYPE_RECEIVED_DISTANT_SEARCH_RESULTS = 0x05, + TYPE_STATISTICS_CHANGED = 0x06 }; virtual ~RsGxsNotify() {} diff --git a/libretroshare/src/retroshare/rsposted.h b/libretroshare/src/retroshare/rsposted.h index 066232d5b..1a089de8e 100644 --- a/libretroshare/src/retroshare/rsposted.h +++ b/libretroshare/src/retroshare/rsposted.h @@ -114,6 +114,7 @@ enum class RsPostedEventCode: uint8_t UPDATED_POSTED_GROUP = 0x04, UPDATED_MESSAGE = 0x05, READ_STATUS_CHANGED = 0x06, + STATISTICS_CHANGED = 0x07, }; diff --git a/libretroshare/src/retroshare/rstokenservice.h b/libretroshare/src/retroshare/rstokenservice.h index 7f801927f..a0fbd7258 100644 --- a/libretroshare/src/retroshare/rstokenservice.h +++ b/libretroshare/src/retroshare/rstokenservice.h @@ -80,7 +80,7 @@ struct RsTokReqOptions { RsTokReqOptions() : mOptions(0), mStatusFilter(0), mStatusMask(0), mMsgFlagMask(0), mMsgFlagFilter(0), mReqType(0), mSubscribeFilter(0), - mSubscribeMask(0), mBefore(0), mAfter(0) {} + mSubscribeMask(0), mBefore(0), mAfter(0),mPriority(GxsRequestPriority::NORMAL) {} /** * Can be one or multiple RS_TOKREQOPT_* @@ -107,6 +107,8 @@ struct RsTokReqOptions // Time range... again applied after Options. rstime_t mBefore; rstime_t mAfter; + + GxsRequestPriority mPriority; }; /*! @@ -181,6 +183,25 @@ public: */ virtual bool requestMsgRelatedInfo(uint32_t &token, uint32_t ansType, const RsTokReqOptions &opts, const std::vector& msgIds) = 0; + /*! + * This request statistics on amount of data held + * number of groups + * number of groups subscribed + * number of messages + * size of db store + * total size of messages + * total size of groups + * @param token + */ + virtual void requestServiceStatistic(uint32_t& token, const RsTokReqOptions &opts) = 0; + + /*! + * To request statistic on a group + * @param token set to value to be redeemed to get statistic + * @param grpId the id of the group + */ + virtual void requestGroupStatistic(uint32_t& token, const RsGxsGroupId& grpId, const RsTokReqOptions &opts) = 0; + /* Poll */ @@ -194,25 +215,6 @@ public: */ virtual GxsRequestStatus requestStatus(const uint32_t token) = 0; - /*! - * This request statistics on amount of data held - * number of groups - * number of groups subscribed - * number of messages - * size of db store - * total size of messages - * total size of groups - * @param token - */ - virtual void requestServiceStatistic(uint32_t& token) = 0; - - /*! - * To request statistic on a group - * @param token set to value to be redeemed to get statistic - * @param grpId the id of the group - */ - virtual void requestGroupStatistic(uint32_t& token, const RsGxsGroupId& grpId) = 0; - /*! * @brief Cancel Request * If this function returns false, it may be that the request has completed diff --git a/libretroshare/src/services/p3gxschannels.cc b/libretroshare/src/services/p3gxschannels.cc index 1e7538c57..9e0072d5b 100644 --- a/libretroshare/src/services/p3gxschannels.cc +++ b/libretroshare/src/services/p3gxschannels.cc @@ -302,7 +302,6 @@ void p3GxsChannels::notifyChanges(std::vector &changes) { switch (grpChange->getType()) { - default: case RsGxsNotify::TYPE_PROCESSED: // happens when the group is subscribed { std::list &grpList = grpChange->mGrpIdList; @@ -318,6 +317,20 @@ void p3GxsChannels::notifyChanges(std::vector &changes) } break; + case RsGxsNotify::TYPE_STATISTICS_CHANGED: + { + std::list &grpList = grpChange->mGrpIdList; + std::list::iterator git; + for (git = grpList.begin(); git != grpList.end(); ++git) + { + auto ev = std::make_shared(); + ev->mChannelGroupId = *git; + ev->mChannelEventCode = RsChannelEventCode::STATISTICS_CHANGED; + rsEvents->postEvent(ev); + } + } + break; + case RsGxsNotify::TYPE_PUBLISHED: case RsGxsNotify::TYPE_RECEIVED_NEW: { @@ -356,9 +369,14 @@ void p3GxsChannels::notifyChanges(std::vector &changes) rsEvents->postEvent(ev); } + } + break; + + default: + RsErr() << " Got a GXS event of type " << grpChange->getType() << " Currently not handled." << std::endl; break; } - } + } RsGxsDistantSearchResultChange *dsrChange = dynamic_cast(*it); diff --git a/libretroshare/src/services/p3gxsforums.cc b/libretroshare/src/services/p3gxsforums.cc index a37a62098..42b60fad2 100644 --- a/libretroshare/src/services/p3gxsforums.cc +++ b/libretroshare/src/services/p3gxsforums.cc @@ -246,7 +246,6 @@ void p3GxsForums::notifyChanges(std::vector &changes) { switch (grpChange->getType()) { - default: case RsGxsNotify::TYPE_PROCESSED: // happens when the group is subscribed { std::list &grpList = grpChange->mGrpIdList; @@ -260,7 +259,7 @@ void p3GxsForums::notifyChanges(std::vector &changes) } } - break; + break; case RsGxsNotify::TYPE_PUBLISHED: case RsGxsNotify::TYPE_RECEIVED_NEW: @@ -288,8 +287,26 @@ void p3GxsForums::notifyChanges(std::vector &changes) << " Not notifying already known forum " << *git << std::endl; } - break; } + break; + + case RsGxsNotify::TYPE_STATISTICS_CHANGED: + { + std::list &grpList = grpChange->mGrpIdList; + std::list::iterator git; + for (git = grpList.begin(); git != grpList.end(); ++git) + { + auto ev = std::make_shared(); + ev->mForumGroupId = *git; + ev->mForumEventCode = RsForumEventCode::STATISTICS_CHANGED; + rsEvents->postEvent(ev); + } + } + break; + default: + RsErr() << " Got a GXS event of type " << grpChange->getType() << " Currently not handled." << std::endl; + break; + #ifdef NOT_USED_YET case RsGxsNotify::TYPE_RECEIVED_PUBLISHKEY: diff --git a/libretroshare/src/services/p3postbase.cc b/libretroshare/src/services/p3postbase.cc index 8dd48c174..771103eb0 100644 --- a/libretroshare/src/services/p3postbase.cc +++ b/libretroshare/src/services/p3postbase.cc @@ -133,8 +133,7 @@ void p3PostBase::notifyChanges(std::vector &changes) #endif switch(grpChange->getType()) - { - default: + { case RsGxsNotify::TYPE_PROCESSED: // happens when the group is subscribed { std::list &grpList = grpChange->mGrpIdList; @@ -148,15 +147,30 @@ void p3PostBase::notifyChanges(std::vector &changes) } } - break; + break; - case RsGxsNotify::TYPE_PUBLISHED: - case RsGxsNotify::TYPE_RECEIVED_NEW: - { - /* group received */ - const std::list& grpList = grpChange->mGrpIdList; + case RsGxsNotify::TYPE_STATISTICS_CHANGED: + { + std::list &grpList = grpChange->mGrpIdList; + std::list::iterator git; - for (auto git = grpList.begin(); git != grpList.end(); ++git) + for (git = grpList.begin(); git != grpList.end(); ++git) + { + auto ev = std::make_shared(); + ev->mPostedGroupId = *git; + ev->mPostedEventCode = RsPostedEventCode::STATISTICS_CHANGED; + rsEvents->postEvent(ev); + } + } + break; + + case RsGxsNotify::TYPE_PUBLISHED: + case RsGxsNotify::TYPE_RECEIVED_NEW: + { + /* group received */ + const std::list& grpList = grpChange->mGrpIdList; + + for (auto git = grpList.begin(); git != grpList.end(); ++git) { if(mKnownPosted.find(*git) == mKnownPosted.end()) { @@ -178,9 +192,13 @@ void p3PostBase::notifyChanges(std::vector &changes) << " Not notifying already known forum " << *git << std::endl; } - } + } break; - } + + default: + RsErr() << " Got a GXS event of type " << grpChange->getType() << " Currently not handled." << std::endl; + break; + } } delete *it; diff --git a/retroshare-gui/src/gui/Posted/PostedDialog.cpp b/retroshare-gui/src/gui/Posted/PostedDialog.cpp index cbf7426d9..a6fdda0bd 100644 --- a/retroshare-gui/src/gui/Posted/PostedDialog.cpp +++ b/retroshare-gui/src/gui/Posted/PostedDialog.cpp @@ -62,13 +62,18 @@ void PostedDialog::handleEvent_main_thread(std::shared_ptr event) case RsPostedEventCode::NEW_MESSAGE: case RsPostedEventCode::UPDATED_MESSAGE: // [[fallthrough]]; case RsPostedEventCode::READ_STATUS_CHANGED: // [[fallthrough]]; - updateMessageSummaryList(e->mPostedGroupId); + updateGroupStatisticsReal(e->mPostedGroupId); // update the list immediately break; case RsPostedEventCode::NEW_POSTED_GROUP: // [[fallthrough]]; case RsPostedEventCode::SUBSCRIBE_STATUS_CHANGED: // [[fallthrough]]; updateDisplay(true); break; + + case RsPostedEventCode::STATISTICS_CHANGED: + updateGroupStatistics(e->mPostedGroupId); + break; + default: break; } } @@ -82,7 +87,7 @@ PostedDialog::~PostedDialog() UserNotify *PostedDialog::createUserNotify(QObject *parent) { - return new PostedUserNotify(rsPosted, parent); + return new PostedUserNotify(rsPosted, this, parent); } QString PostedDialog::getHelpString() const diff --git a/retroshare-gui/src/gui/Posted/PostedUserNotify.cpp b/retroshare-gui/src/gui/Posted/PostedUserNotify.cpp index 5e7d75040..ed7343557 100644 --- a/retroshare-gui/src/gui/Posted/PostedUserNotify.cpp +++ b/retroshare-gui/src/gui/Posted/PostedUserNotify.cpp @@ -22,8 +22,8 @@ #include "PostedUserNotify.h" #include "gui/MainWindow.h" -PostedUserNotify::PostedUserNotify(RsGxsIfaceHelper *ifaceImpl, QObject *parent) : - GxsUserNotify(ifaceImpl, parent) +PostedUserNotify::PostedUserNotify(RsGxsIfaceHelper *ifaceImpl, const GxsGroupFrameDialog *g, QObject *parent) : + GxsUserNotify(ifaceImpl, g, parent) { } @@ -34,10 +34,6 @@ bool PostedUserNotify::hasSetting(QString *name, QString *group) return true; } -bool PostedUserNotify::getServiceStatistics(GxsServiceStatistic& stat) -{ - return rsPosted->getBoardsServiceStatistics(stat); -} QIcon PostedUserNotify::getIcon() { diff --git a/retroshare-gui/src/gui/Posted/PostedUserNotify.h b/retroshare-gui/src/gui/Posted/PostedUserNotify.h index 8fd4ccdf9..2c12ac889 100644 --- a/retroshare-gui/src/gui/Posted/PostedUserNotify.h +++ b/retroshare-gui/src/gui/Posted/PostedUserNotify.h @@ -28,10 +28,9 @@ class PostedUserNotify : public GxsUserNotify Q_OBJECT public: - PostedUserNotify(RsGxsIfaceHelper *ifaceImpl, QObject *parent = 0); + PostedUserNotify(RsGxsIfaceHelper *ifaceImpl, const GxsGroupFrameDialog *g, QObject *parent = 0); virtual bool hasSetting(QString *name, QString *group); - virtual bool getServiceStatistics(GxsServiceStatistic& stat) override; private: virtual QIcon getIcon(); diff --git a/retroshare-gui/src/gui/gxs/GxsGroupFrameDialog.cpp b/retroshare-gui/src/gui/gxs/GxsGroupFrameDialog.cpp index aa9dde0ff..1245c80b2 100644 --- a/retroshare-gui/src/gui/gxs/GxsGroupFrameDialog.cpp +++ b/retroshare-gui/src/gui/gxs/GxsGroupFrameDialog.cpp @@ -59,6 +59,8 @@ #define MAX_COMMENT_TITLE 32 +static const uint32_t DELAY_BETWEEN_GROUP_STATISTICS_UPDATE = 120; // do not update group statistics more often than once every 2 mins + /* * Transformation Notes: * there are still a couple of things that the new groups differ from Old version. @@ -76,6 +78,9 @@ GxsGroupFrameDialog::GxsGroupFrameDialog(RsGxsIfaceHelper *ifaceImpl, QWidget *p ui = new Ui::GxsGroupFrameDialog(); ui->setupUi(this); + mShouldUpdateMessageSummaryList = true; + mShouldUpdateGroupStatistics = false; + mLastGroupStatisticsUpdateTs=0; mInitialized = false; mDistSyncAllowed = allow_dist_sync; mInFill = false; @@ -182,7 +187,43 @@ void GxsGroupFrameDialog::showEvent(QShowEvent *event) initUi(); } - updateDisplay( mCachedGroupMetas.empty() ); + uint32_t children = mYourGroups->childCount() + mSubscribedGroups->childCount() + mPopularGroups->childCount() + mOtherGroups->childCount(); + + bool empty = mCachedGroupMetas.empty() || children==0; + + updateDisplay( empty ); +} + +void GxsGroupFrameDialog::paintEvent(QPaintEvent *pe) +{ + if(mShouldUpdateMessageSummaryList) + { + if(!mGroupIdsSummaryToUpdate.empty()) + for(auto& group_id: mGroupIdsSummaryToUpdate) + updateMessageSummaryListReal(group_id); + else + updateMessageSummaryListReal(RsGxsGroupId()); + + mShouldUpdateMessageSummaryList = false; + mGroupIdsSummaryToUpdate.clear(); + } + + rstime_t now = time(nullptr); + + if(mShouldUpdateGroupStatistics && now > DELAY_BETWEEN_GROUP_STATISTICS_UPDATE + mLastGroupStatisticsUpdateTs) + { + // This mechanism allows to gather multiple updateGroupStatistics events at once and not send too many of them at the same time. + // it avoids re-loadign all the group everytime a friend sends new statistics. + + for(auto& groupId: mGroupStatisticsToUpdate) + updateGroupStatisticsReal(groupId); + + mShouldUpdateGroupStatistics = false; + mLastGroupStatisticsUpdateTs = time(nullptr); + mGroupStatisticsToUpdate.clear(); + } + + MainPage::paintEvent(pe); } void GxsGroupFrameDialog::processSettings(bool load) @@ -988,6 +1029,18 @@ void GxsGroupFrameDialog::insertGroupsData(const std::listgroupTreeWidget->setUnreadCount(item, mCountChildMsgs ? (stats.mNumThreadMsgsUnread + stats.mNumChildMsgsUnread) : stats.mNumThreadMsgsUnread); + mCachedGroupStats[groupId] = stats; getUserNotify()->updateIcon(); @@ -1103,6 +1163,23 @@ void GxsGroupFrameDialog::updateGroupStatistics(const RsGxsGroupId &groupId) }); } +void GxsGroupFrameDialog::getServiceStatistics(GxsServiceStatistic& stats) const +{ + stats = GxsServiceStatistic(); // clears everything + + for(auto it: mCachedGroupStats) + { + const GxsGroupStatistic& s(it.second); + + stats.mNumMsgs += s.mNumMsgs; + stats.mNumGrps += 1; + stats.mSizeOfMsgs += s.mTotalSizeOfMsgs; + stats.mNumThreadMsgsNew += s.mNumThreadMsgsNew; + stats.mNumThreadMsgsUnread += s.mNumThreadMsgsUnread; + stats.mNumChildMsgsNew += s.mNumChildMsgsNew ; + stats.mNumChildMsgsUnread += s.mNumChildMsgsUnread ; + } +} TurtleRequestId GxsGroupFrameDialog::distantSearch(const QString& search_string) // this should be overloaded in the child class { diff --git a/retroshare-gui/src/gui/gxs/GxsGroupFrameDialog.h b/retroshare-gui/src/gui/gxs/GxsGroupFrameDialog.h index 4aef6a6b1..902e49087 100644 --- a/retroshare-gui/src/gui/gxs/GxsGroupFrameDialog.h +++ b/retroshare-gui/src/gui/gxs/GxsGroupFrameDialog.h @@ -29,7 +29,6 @@ #include -#include "util/TokenQueue.h" #include "GxsIdTreeWidgetItem.h" #include "GxsGroupDialog.h" @@ -80,8 +79,11 @@ public: virtual void getGroupList(std::map &groups) ; + void getServiceStatistics(GxsServiceStatistic& stats) const ; + protected: - virtual void showEvent(QShowEvent *event); + virtual void showEvent(QShowEvent *event) override; + virtual void paintEvent(QPaintEvent *pe) override; virtual void updateDisplay(bool complete); const RsGxsGroupId &groupId() { return mGroupId; } @@ -102,6 +104,10 @@ protected: virtual bool getGroupData(std::list& groupInfo) =0; virtual bool getGroupStatistics(const RsGxsGroupId& groupId,GxsGroupStatistic& stat) =0; + + void updateGroupStatisticsReal(const RsGxsGroupId &groupId); + void updateMessageSummaryListReal(RsGxsGroupId groupId); + private slots: void todo(); @@ -173,20 +179,13 @@ private: virtual uint32_t requestGroupSummaryType() { return GXS_REQUEST_TYPE_GROUP_META; } // request only meta data - void requestGroupStatistics(const RsGxsGroupId &groupId); - void loadGroupStatistics(const uint32_t &token); - // subscribe/unsubscribe ack. -// void acknowledgeSubscribeChange(const uint32_t &token); GxsMessageFrameWidget *messageWidget(const RsGxsGroupId &groupId, bool ownTab); GxsMessageFrameWidget *createMessageWidget(const RsGxsGroupId &groupId); GxsCommentDialog *commentWidget(const RsGxsMessageId &msgId); -// void requestGroupSummary_CurrentGroup(const RsGxsGroupId &groupId); -// void loadGroupSummary_CurrentGroup(const uint32_t &token); - protected: void updateSearchResults(); @@ -209,12 +208,23 @@ private: RsGxsGroupId mNavigatePendingGroupId; RsGxsMessageId mNavigatePendingMsgId; + // Message summary list update + + bool mShouldUpdateMessageSummaryList ; // whether we should update the counting for groups. This takes some CPU so we only do it when needed. + std::set mGroupIdsSummaryToUpdate; + + // GroupStatistics update + bool mShouldUpdateGroupStatistics; + rstime_t mLastGroupStatisticsUpdateTs; + std::set mGroupStatisticsToUpdate; + UIStateHelper *mStateHelper; /** Qt Designer generated object */ Ui::GxsGroupFrameDialog *ui; std::map mCachedGroupMetas; + std::map mCachedGroupStats; std::map mSearchGroupsItems ; std::map > mKnownGroups; diff --git a/retroshare-gui/src/gui/gxs/GxsUserNotify.cpp b/retroshare-gui/src/gui/gxs/GxsUserNotify.cpp index b6ebc89af..bc1afa1c1 100644 --- a/retroshare-gui/src/gui/gxs/GxsUserNotify.cpp +++ b/retroshare-gui/src/gui/gxs/GxsUserNotify.cpp @@ -26,7 +26,7 @@ #define TOKEN_TYPE_STATISTICS 1 -GxsUserNotify::GxsUserNotify(RsGxsIfaceHelper *ifaceImpl, QObject *parent) : UserNotify(parent) +GxsUserNotify::GxsUserNotify(RsGxsIfaceHelper *ifaceImpl, const GxsGroupFrameDialog *g,QObject *parent) : UserNotify(parent), mGroupFrameDialog(g) { mNewThreadMessageCount = 0; mNewChildMessageCount = 0; @@ -40,32 +40,17 @@ void GxsUserNotify::startUpdate() mNewThreadMessageCount = 0; mNewChildMessageCount = 0; - RsThread::async([this]() - { - // 1 - get message data from p3GxsForums -#ifdef DEBUG_FORUMS - std::cerr << "Retrieving post data for post " << mThreadId << std::endl; -#endif + GxsServiceStatistic stats; + mGroupFrameDialog->getServiceStatistics(stats); - GxsServiceStatistic stats; - - if(!getServiceStatistics(stats)) - return; - - RsQThreadUtils::postToObject( [stats,this]() - { - /* Here it goes any code you want to be executed on the Qt Gui + /* Here it goes any code you want to be executed on the Qt Gui * thread, for example to update the data model with new information * after a blocking call to RetroShare API complete */ - mNewThreadMessageCount = stats.mNumThreadMsgsNew; - mNewChildMessageCount = stats.mNumChildMsgsNew; + mNewThreadMessageCount = stats.mNumThreadMsgsNew; + mNewChildMessageCount = stats.mNumChildMsgsNew; - update(); - - }, this ); - - }); + update(); } diff --git a/retroshare-gui/src/gui/gxs/GxsUserNotify.h b/retroshare-gui/src/gui/gxs/GxsUserNotify.h index cc1cd3809..f9c5cd616 100644 --- a/retroshare-gui/src/gui/gxs/GxsUserNotify.h +++ b/retroshare-gui/src/gui/gxs/GxsUserNotify.h @@ -23,6 +23,7 @@ #include #include "gui/common/UserNotify.h" +#include "gui/gxs/GxsGroupFrameDialog.h" #include "util/TokenQueue.h" struct RsGxsIfaceHelper; @@ -33,12 +34,11 @@ class GxsUserNotify : public UserNotify Q_OBJECT public: - GxsUserNotify(RsGxsIfaceHelper *ifaceImpl, QObject *parent = 0); + GxsUserNotify(RsGxsIfaceHelper *ifaceImpl, const GxsGroupFrameDialog *g, QObject *parent = 0); virtual ~GxsUserNotify(); protected: virtual void startUpdate(); - virtual bool getServiceStatistics(GxsServiceStatistic& stat)=0; private: virtual unsigned int getNewCount() { return mCountChildMsgs ? (mNewThreadMessageCount + mNewChildMessageCount) : mNewThreadMessageCount; } @@ -48,6 +48,8 @@ protected: private: RsGxsUpdateBroadcastBase *mBase; + const GxsGroupFrameDialog *mGroupFrameDialog; + unsigned int mNewThreadMessageCount; unsigned int mNewChildMessageCount; }; diff --git a/retroshare-gui/src/gui/gxschannels/GxsChannelDialog.cpp b/retroshare-gui/src/gui/gxschannels/GxsChannelDialog.cpp index 3d19f00f9..70f3de04a 100644 --- a/retroshare-gui/src/gui/gxschannels/GxsChannelDialog.cpp +++ b/retroshare-gui/src/gui/gxschannels/GxsChannelDialog.cpp @@ -64,10 +64,10 @@ void GxsChannelDialog::handleEvent_main_thread(std::shared_ptr ev switch(e->mChannelEventCode) { - case RsChannelEventCode::NEW_MESSAGE: - case RsChannelEventCode::UPDATED_MESSAGE: // [[fallthrough]]; - case RsChannelEventCode::READ_STATUS_CHANGED: - updateMessageSummaryList(e->mChannelGroupId); + case RsChannelEventCode::NEW_MESSAGE: // [[fallthrough]]; + case RsChannelEventCode::UPDATED_MESSAGE: // [[fallthrough]]; + case RsChannelEventCode::READ_STATUS_CHANGED: // [[fallthrough]]; + updateGroupStatisticsReal(e->mChannelGroupId); // update the list immediately break; case RsChannelEventCode::RECEIVED_DISTANT_SEARCH_RESULT: @@ -80,6 +80,10 @@ void GxsChannelDialog::handleEvent_main_thread(std::shared_ptr ev updateDisplay(true); break; + case RsChannelEventCode::STATISTICS_CHANGED: + updateGroupStatistics(e->mChannelGroupId); + break; + default: break; } @@ -109,7 +113,7 @@ QString GxsChannelDialog::getHelpString() const UserNotify *GxsChannelDialog::createUserNotify(QObject *parent) { - return new GxsChannelUserNotify(rsGxsChannels, parent); + return new GxsChannelUserNotify(rsGxsChannels,this, parent); } void GxsChannelDialog::shareOnChannel(const RsGxsGroupId& channel_id,const QList& file_links) diff --git a/retroshare-gui/src/gui/gxschannels/GxsChannelUserNotify.cpp b/retroshare-gui/src/gui/gxschannels/GxsChannelUserNotify.cpp index 3e5212476..eb54a39c3 100644 --- a/retroshare-gui/src/gui/gxschannels/GxsChannelUserNotify.cpp +++ b/retroshare-gui/src/gui/gxschannels/GxsChannelUserNotify.cpp @@ -22,8 +22,8 @@ #include "GxsChannelUserNotify.h" #include "gui/MainWindow.h" -GxsChannelUserNotify::GxsChannelUserNotify(RsGxsIfaceHelper *ifaceImpl, QObject *parent) : - GxsUserNotify(ifaceImpl, parent) +GxsChannelUserNotify::GxsChannelUserNotify(RsGxsIfaceHelper *ifaceImpl, const GxsGroupFrameDialog *g, QObject *parent) : + GxsUserNotify(ifaceImpl, g, parent) { } @@ -35,11 +35,6 @@ bool GxsChannelUserNotify::hasSetting(QString *name, QString *group) return true; } -bool GxsChannelUserNotify::getServiceStatistics(GxsServiceStatistic& stat) -{ - return rsGxsChannels->getChannelServiceStatistics(stat); -} - QIcon GxsChannelUserNotify::getIcon() { return QIcon(":/icons/png/channel.png"); diff --git a/retroshare-gui/src/gui/gxschannels/GxsChannelUserNotify.h b/retroshare-gui/src/gui/gxschannels/GxsChannelUserNotify.h index d69848aba..df29a0202 100644 --- a/retroshare-gui/src/gui/gxschannels/GxsChannelUserNotify.h +++ b/retroshare-gui/src/gui/gxschannels/GxsChannelUserNotify.h @@ -22,16 +22,16 @@ #define GXSCHANNELUSERNOTIFY_H #include "gui/gxs/GxsUserNotify.h" +#include "gui/gxs/GxsGroupFrameDialog.h" class GxsChannelUserNotify : public GxsUserNotify { Q_OBJECT public: - GxsChannelUserNotify(RsGxsIfaceHelper *ifaceImpl, QObject *parent = 0); + GxsChannelUserNotify(RsGxsIfaceHelper *ifaceImpl, const GxsGroupFrameDialog *g, QObject *parent = 0); virtual bool hasSetting(QString *name, QString *group); - virtual bool getServiceStatistics(GxsServiceStatistic& stat) override; private: virtual QIcon getIcon(); diff --git a/retroshare-gui/src/gui/gxsforums/GxsForumUserNotify.cpp b/retroshare-gui/src/gui/gxsforums/GxsForumUserNotify.cpp index 8c07e2099..876aa3a95 100644 --- a/retroshare-gui/src/gui/gxsforums/GxsForumUserNotify.cpp +++ b/retroshare-gui/src/gui/gxsforums/GxsForumUserNotify.cpp @@ -22,8 +22,8 @@ #include "GxsForumUserNotify.h" #include "gui/MainWindow.h" -GxsForumUserNotify::GxsForumUserNotify(RsGxsIfaceHelper *ifaceImpl, QObject *parent) : - GxsUserNotify(ifaceImpl, parent) +GxsForumUserNotify::GxsForumUserNotify(RsGxsIfaceHelper *ifaceImpl, const GxsGroupFrameDialog *g, QObject *parent) : + GxsUserNotify(ifaceImpl, g, parent) { mCountChildMsgs = true; } @@ -35,10 +35,6 @@ bool GxsForumUserNotify::hasSetting(QString *name, QString *group) return true; } -bool GxsForumUserNotify::getServiceStatistics(GxsServiceStatistic& stat) -{ - return rsGxsForums->getForumServiceStatistics(stat); -} QIcon GxsForumUserNotify::getIcon() { diff --git a/retroshare-gui/src/gui/gxsforums/GxsForumUserNotify.h b/retroshare-gui/src/gui/gxsforums/GxsForumUserNotify.h index bc97550de..172a327ae 100644 --- a/retroshare-gui/src/gui/gxsforums/GxsForumUserNotify.h +++ b/retroshare-gui/src/gui/gxsforums/GxsForumUserNotify.h @@ -28,8 +28,7 @@ class GxsForumUserNotify : public GxsUserNotify Q_OBJECT public: - GxsForumUserNotify(RsGxsIfaceHelper *ifaceImpl, QObject *parent = 0); - virtual bool getServiceStatistics(GxsServiceStatistic& stat) override; + GxsForumUserNotify(RsGxsIfaceHelper *ifaceImpl, const GxsGroupFrameDialog *g, QObject *parent = 0); virtual bool hasSetting(QString *name, QString *group); diff --git a/retroshare-gui/src/gui/gxsforums/GxsForumsDialog.cpp b/retroshare-gui/src/gui/gxsforums/GxsForumsDialog.cpp index d8d300dde..1b0a36c6f 100644 --- a/retroshare-gui/src/gui/gxsforums/GxsForumsDialog.cpp +++ b/retroshare-gui/src/gui/gxsforums/GxsForumsDialog.cpp @@ -62,13 +62,18 @@ void GxsForumsDialog::handleEvent_main_thread(std::shared_ptr eve case RsForumEventCode::NEW_MESSAGE: case RsForumEventCode::UPDATED_MESSAGE: // [[fallthrough]]; case RsForumEventCode::READ_STATUS_CHANGED: - updateMessageSummaryList(e->mForumGroupId); + updateGroupStatisticsReal(e->mForumGroupId); // update the list immediately break; case RsForumEventCode::NEW_FORUM: // [[fallthrough]]; case RsForumEventCode::SUBSCRIBE_STATUS_CHANGED: updateDisplay(true); break; + + case RsForumEventCode::STATISTICS_CHANGED: + updateGroupStatistics(e->mForumGroupId); // update the list when redraw less often than once every 2 mins + break; + default: break; } @@ -133,7 +138,7 @@ void GxsForumsDialog::shareInMessage(const RsGxsGroupId& forum_id,const QList