From 384e7ba0357ad5e73005d2a2966c5fc83774a32e Mon Sep 17 00:00:00 2001 From: thunder2 Date: Thu, 13 Aug 2015 19:44:27 +0200 Subject: [PATCH] Reworked processing of requests in RsGxsDataAccess to prevent freezes of the gui. --- libretroshare/src/gxs/rsgxsdataaccess.cc | 246 ++++++++++-------- libretroshare/src/retroshare/rstokenservice.h | 1 + 2 files changed, 145 insertions(+), 102 deletions(-) diff --git a/libretroshare/src/gxs/rsgxsdataaccess.cc b/libretroshare/src/gxs/rsgxsdataaccess.cc index 55c4de7d6..e1c922d61 100644 --- a/libretroshare/src/gxs/rsgxsdataaccess.cc +++ b/libretroshare/src/gxs/rsgxsdataaccess.cc @@ -59,7 +59,7 @@ const uint8_t RsTokenService::GXS_REQUEST_V2_STATUS_FINISHED_INCOMPLETE = 3; const uint8_t RsTokenService::GXS_REQUEST_V2_STATUS_COMPLETE = 4; const uint8_t RsTokenService::GXS_REQUEST_V2_STATUS_DONE = 5; // ONCE ALL DATA RETRIEVED. - + const uint8_t RsTokenService::GXS_REQUEST_V2_STATUS_CANCELLED = 6; /*********** * #define DATA_DEBUG 1 @@ -367,13 +367,19 @@ uint32_t RsGxsDataAccess::requestStatus(uint32_t token) return status; } - - - - bool RsGxsDataAccess::cancelRequest(const uint32_t& token) { - return clearRequest(token); + RsStackMutex stack(mDataMutex); /****** LOCKED *****/ + + GxsRequest* req = locked_retrieveRequest(token); + if (!req) + { + return false; + } + + req->status = GXS_REQUEST_V2_STATUS_CANCELLED; + + return true; } bool RsGxsDataAccess::clearRequest(const uint32_t& token) @@ -389,7 +395,7 @@ bool RsGxsDataAccess::clearRequest(const uint32_t& token) } delete it->second; - mRequests.erase(it->first); + mRequests.erase(it); return true; } @@ -707,7 +713,6 @@ bool RsGxsDataAccess::getGroupList(const uint32_t& token, std::list toClear; - std::list::iterator cit; time_t now = time(NULL); + std::map::iterator it; { RsStackMutex stack(mDataMutex); /******* LOCKED *******/ - std::map::iterator it; + // process status of the requests + for (it = mRequests.begin(); it != mRequests.end(); ++it) + { + GxsRequest* req = it->second; + + switch (req->status) + { + case GXS_REQUEST_V2_STATUS_PENDING: + // process request later + break; + case GXS_REQUEST_V2_STATUS_PARTIAL: + // should not happen + req->status = GXS_REQUEST_V2_STATUS_COMPLETE; + break; + case GXS_REQUEST_V2_STATUS_DONE: +#ifdef DATA_DEBUG + std::cerr << "RsGxsDataAccess::processrequests() Clearing Done Request Token: " << req->token; + std::cerr << std::endl; +#endif + toClear.push_back(req->token); + break; + case GXS_REQUEST_V2_STATUS_CANCELLED: +#ifdef DATA_DEBUG + std::cerr << "RsGxsDataAccess::processrequests() Clearing Cancelled Request Token: " << req->token; + std::cerr << std::endl; +#endif + toClear.push_back(req->token); + break; + default: + if (now - req->reqTime > MAX_REQUEST_AGE) + { +#ifdef DATA_DEBUG + std::cerr << "RsGxsDataAccess::processrequests() Clearing Old Request Token: " << req->token; + std::cerr << std::endl; +#endif + toClear.push_back(req->token); + } + } + } + } // END OF MUTEX. + + // clear requests + std::list::iterator cit; + for (cit = toClear.begin(); cit != toClear.end(); ++cit) + { + clearRequest(*cit); + } + + // process requests + while (true) + { + GxsRequest* req = NULL; + { + RsStackMutex stack(mDataMutex); /******* LOCKED *******/ + + // get the first pending request + for (it = mRequests.begin(); it != mRequests.end(); ++it) + { + GxsRequest* reqCheck = it->second; + if (reqCheck->status == GXS_REQUEST_V2_STATUS_PENDING) + { + req = reqCheck; + req->status = GXS_REQUEST_V2_STATUS_PARTIAL; + break; + } + } + } // END OF MUTEX. + + if (!req) { + break; + } GroupMetaReq* gmr; GroupDataReq* gdr; @@ -739,101 +813,69 @@ void RsGxsDataAccess::processRequests() MsgMetaReq* mmr; MsgDataReq* mdr; MsgIdReq* mir; - MsgRelatedInfoReq* mri; - GroupStatisticRequest* gsr; - ServiceStatisticRequest* ssr; + MsgRelatedInfoReq* mri; + GroupStatisticRequest* gsr; + ServiceStatisticRequest* ssr; - for(it = mRequests.begin(); it != mRequests.end(); ++it) +#ifdef DATA_DEBUG + std::cerr << "RsGxsDataAccess::processRequests() Processing Token: " << req->token << " Status: " + << req->status << " ReqType: " << req->reqType << " Age: " + << now - req->reqTime << std::endl; +#endif + + /* PROCESS REQUEST! */ + bool ok = false; + + if((gmr = dynamic_cast(req)) != NULL) { - - GxsRequest* req = it->second; - if (req->status == GXS_REQUEST_V2_STATUS_PENDING) - { -#ifdef DATA_DEBUG - std::cerr << "RsGxsDataAccess::processRequests() Processing Token: " << req->token << " Status: " - << req->status << " ReqType: " << req->reqType << " Age: " - << now - req->reqTime << std::endl; -#endif - - req->status = GXS_REQUEST_V2_STATUS_PARTIAL; - - /* PROCESS REQUEST! */ - - if((gmr = dynamic_cast(req)) != NULL) - { - getGroupSummary(gmr); - } - else if((gdr = dynamic_cast(req)) != NULL) - { - getGroupData(gdr); - } - else if((gir = dynamic_cast(req)) != NULL) - { - getGroupList(gir); - } - else if((mmr = dynamic_cast(req)) != NULL) - { - getMsgSummary(mmr); - } - else if((mdr = dynamic_cast(req)) != NULL) - { - getMsgData(mdr); - } - else if((mir = dynamic_cast(req)) != NULL) - { - getMsgList(mir); - } - else if((mri = dynamic_cast(req)) != NULL) - { - getMsgRelatedInfo(mri); - } - else if((gsr = dynamic_cast(req)) != NULL) - { - getGroupStatistic(gsr); - } - else if((ssr = dynamic_cast(req)) != NULL) - { - getServiceStatistic(ssr); - } - else - { - std::cerr << "RsGxsDataAccess::processRequests() Failed to process request, token: " - << req->token << std::endl; - - req->status = GXS_REQUEST_V2_STATUS_FAILED; - } - } - else if (req->status == GXS_REQUEST_V2_STATUS_PARTIAL) - { - req->status = GXS_REQUEST_V2_STATUS_COMPLETE; - } - else if (req->status == GXS_REQUEST_V2_STATUS_DONE) - { -#ifdef DATA_DEBUG - std::cerr << "RsGxsDataAccess::processrequests() Clearing Done Request Token: " - << req->token; - std::cerr << std::endl; -#endif - toClear.push_back(req->token); - } - else if (now - req->reqTime > MAX_REQUEST_AGE) - { -#ifdef DATA_DEBUG - std::cerr << "RsGxsDataAccess::processrequests() Clearing Old Request Token: " << req->token; - std::cerr << std::endl; -#endif - toClear.push_back(req->token); - } + ok = getGroupSummary(gmr); + } + else if((gdr = dynamic_cast(req)) != NULL) + { + ok = getGroupData(gdr); + } + else if((gir = dynamic_cast(req)) != NULL) + { + ok = getGroupList(gir); + } + else if((mmr = dynamic_cast(req)) != NULL) + { + ok = getMsgSummary(mmr); + } + else if((mdr = dynamic_cast(req)) != NULL) + { + ok = getMsgData(mdr); + } + else if((mir = dynamic_cast(req)) != NULL) + { + ok = getMsgList(mir); + } + else if((mri = dynamic_cast(req)) != NULL) + { + ok = getMsgRelatedInfo(mri); + } + else if((gsr = dynamic_cast(req)) != NULL) + { + ok = getGroupStatistic(gsr); + } + else if((ssr = dynamic_cast(req)) != NULL) + { + ok = getServiceStatistic(ssr); + } + else + { + std::cerr << "RsGxsDataAccess::processRequests() Failed to process request, token: " + << req->token << std::endl; } - } // END OF MUTEX. - - for(cit = toClear.begin(); cit != toClear.end(); ++cit) - { - clearRequest(*cit); + { + RsStackMutex stack(mDataMutex); /******* LOCKED *******/ + if (req->status == GXS_REQUEST_V2_STATUS_PARTIAL) + { + req->status = ok ? GXS_REQUEST_V2_STATUS_COMPLETE : GXS_REQUEST_V2_STATUS_FAILED; + } + } // END OF MUTEX. } - - return; } bool RsGxsDataAccess::getGroupStatistic(const uint32_t &token, GxsGroupStatistic &grpStatistic) @@ -1719,7 +1761,7 @@ bool RsGxsDataAccess::checkRequestStatus(const uint32_t& token, GxsRequest* req = locked_retrieveRequest(token); - if(req == NULL) + if (req == NULL || req->status == GXS_REQUEST_V2_STATUS_CANCELLED) return false; anstype = req->ansType; diff --git a/libretroshare/src/retroshare/rstokenservice.h b/libretroshare/src/retroshare/rstokenservice.h index 97f406222..954148359 100644 --- a/libretroshare/src/retroshare/rstokenservice.h +++ b/libretroshare/src/retroshare/rstokenservice.h @@ -121,6 +121,7 @@ public: static const uint8_t GXS_REQUEST_V2_STATUS_FINISHED_INCOMPLETE; static const uint8_t GXS_REQUEST_V2_STATUS_COMPLETE; static const uint8_t GXS_REQUEST_V2_STATUS_DONE; // ONCE ALL DATA RETRIEVED. + static const uint8_t GXS_REQUEST_V2_STATUS_CANCELLED; public: