Reworked processing of requests in RsGxsDataAccess to prevent freezes of the gui.

This commit is contained in:
thunder2 2015-08-13 19:44:27 +02:00
parent 4a50a62cc8
commit 384e7ba035
2 changed files with 145 additions and 102 deletions

View File

@ -59,7 +59,7 @@
const uint8_t RsTokenService::GXS_REQUEST_V2_STATUS_FINISHED_INCOMPLETE = 3;
const uint8_t RsTokenService::GXS_REQUEST_V2_STATUS_COMPLETE = 4;
const uint8_t RsTokenService::GXS_REQUEST_V2_STATUS_DONE = 5; // ONCE ALL DATA RETRIEVED.
const uint8_t RsTokenService::GXS_REQUEST_V2_STATUS_CANCELLED = 6;
/***********
* #define DATA_DEBUG 1
@ -367,13 +367,19 @@ uint32_t RsGxsDataAccess::requestStatus(uint32_t token)
return status;
}
bool RsGxsDataAccess::cancelRequest(const uint32_t& token)
{
return clearRequest(token);
RsStackMutex stack(mDataMutex); /****** LOCKED *****/
GxsRequest* req = locked_retrieveRequest(token);
if (!req)
{
return false;
}
req->status = GXS_REQUEST_V2_STATUS_CANCELLED;
return true;
}
bool RsGxsDataAccess::clearRequest(const uint32_t& token)
@ -389,7 +395,7 @@ bool RsGxsDataAccess::clearRequest(const uint32_t& token)
}
delete it->second;
mRequests.erase(it->first);
mRequests.erase(it);
return true;
}
@ -707,7 +713,6 @@ bool RsGxsDataAccess::getGroupList(const uint32_t& token, std::list<RsGxsGroupId
return true;
}
GxsRequest* RsGxsDataAccess::locked_retrieveRequest(const uint32_t& token)
{
@ -722,15 +727,84 @@ GxsRequest* RsGxsDataAccess::locked_retrieveRequest(const uint32_t& token)
void RsGxsDataAccess::processRequests()
{
std::list<uint32_t> toClear;
std::list<uint32_t>::iterator cit;
time_t now = time(NULL);
std::map<uint32_t, GxsRequest*>::iterator it;
{
RsStackMutex stack(mDataMutex); /******* LOCKED *******/
std::map<uint32_t, GxsRequest*>::iterator it;
// process status of the requests
for (it = mRequests.begin(); it != mRequests.end(); ++it)
{
GxsRequest* req = it->second;
switch (req->status)
{
case GXS_REQUEST_V2_STATUS_PENDING:
// process request later
break;
case GXS_REQUEST_V2_STATUS_PARTIAL:
// should not happen
req->status = GXS_REQUEST_V2_STATUS_COMPLETE;
break;
case GXS_REQUEST_V2_STATUS_DONE:
#ifdef DATA_DEBUG
std::cerr << "RsGxsDataAccess::processrequests() Clearing Done Request Token: " << req->token;
std::cerr << std::endl;
#endif
toClear.push_back(req->token);
break;
case GXS_REQUEST_V2_STATUS_CANCELLED:
#ifdef DATA_DEBUG
std::cerr << "RsGxsDataAccess::processrequests() Clearing Cancelled Request Token: " << req->token;
std::cerr << std::endl;
#endif
toClear.push_back(req->token);
break;
default:
if (now - req->reqTime > MAX_REQUEST_AGE)
{
#ifdef DATA_DEBUG
std::cerr << "RsGxsDataAccess::processrequests() Clearing Old Request Token: " << req->token;
std::cerr << std::endl;
#endif
toClear.push_back(req->token);
}
}
}
} // END OF MUTEX.
// clear requests
std::list<uint32_t>::iterator cit;
for (cit = toClear.begin(); cit != toClear.end(); ++cit)
{
clearRequest(*cit);
}
// process requests
while (true)
{
GxsRequest* req = NULL;
{
RsStackMutex stack(mDataMutex); /******* LOCKED *******/
// get the first pending request
for (it = mRequests.begin(); it != mRequests.end(); ++it)
{
GxsRequest* reqCheck = it->second;
if (reqCheck->status == GXS_REQUEST_V2_STATUS_PENDING)
{
req = reqCheck;
req->status = GXS_REQUEST_V2_STATUS_PARTIAL;
break;
}
}
} // END OF MUTEX.
if (!req) {
break;
}
GroupMetaReq* gmr;
GroupDataReq* gdr;
@ -739,101 +813,69 @@ void RsGxsDataAccess::processRequests()
MsgMetaReq* mmr;
MsgDataReq* mdr;
MsgIdReq* mir;
MsgRelatedInfoReq* mri;
GroupStatisticRequest* gsr;
ServiceStatisticRequest* ssr;
MsgRelatedInfoReq* mri;
GroupStatisticRequest* gsr;
ServiceStatisticRequest* ssr;
for(it = mRequests.begin(); it != mRequests.end(); ++it)
#ifdef DATA_DEBUG
std::cerr << "RsGxsDataAccess::processRequests() Processing Token: " << req->token << " Status: "
<< req->status << " ReqType: " << req->reqType << " Age: "
<< now - req->reqTime << std::endl;
#endif
/* PROCESS REQUEST! */
bool ok = false;
if((gmr = dynamic_cast<GroupMetaReq*>(req)) != NULL)
{
GxsRequest* req = it->second;
if (req->status == GXS_REQUEST_V2_STATUS_PENDING)
{
#ifdef DATA_DEBUG
std::cerr << "RsGxsDataAccess::processRequests() Processing Token: " << req->token << " Status: "
<< req->status << " ReqType: " << req->reqType << " Age: "
<< now - req->reqTime << std::endl;
#endif
req->status = GXS_REQUEST_V2_STATUS_PARTIAL;
/* PROCESS REQUEST! */
if((gmr = dynamic_cast<GroupMetaReq*>(req)) != NULL)
{
getGroupSummary(gmr);
}
else if((gdr = dynamic_cast<GroupDataReq*>(req)) != NULL)
{
getGroupData(gdr);
}
else if((gir = dynamic_cast<GroupIdReq*>(req)) != NULL)
{
getGroupList(gir);
}
else if((mmr = dynamic_cast<MsgMetaReq*>(req)) != NULL)
{
getMsgSummary(mmr);
}
else if((mdr = dynamic_cast<MsgDataReq*>(req)) != NULL)
{
getMsgData(mdr);
}
else if((mir = dynamic_cast<MsgIdReq*>(req)) != NULL)
{
getMsgList(mir);
}
else if((mri = dynamic_cast<MsgRelatedInfoReq*>(req)) != NULL)
{
getMsgRelatedInfo(mri);
}
else if((gsr = dynamic_cast<GroupStatisticRequest*>(req)) != NULL)
{
getGroupStatistic(gsr);
}
else if((ssr = dynamic_cast<ServiceStatisticRequest*>(req)) != NULL)
{
getServiceStatistic(ssr);
}
else
{
std::cerr << "RsGxsDataAccess::processRequests() Failed to process request, token: "
<< req->token << std::endl;
req->status = GXS_REQUEST_V2_STATUS_FAILED;
}
}
else if (req->status == GXS_REQUEST_V2_STATUS_PARTIAL)
{
req->status = GXS_REQUEST_V2_STATUS_COMPLETE;
}
else if (req->status == GXS_REQUEST_V2_STATUS_DONE)
{
#ifdef DATA_DEBUG
std::cerr << "RsGxsDataAccess::processrequests() Clearing Done Request Token: "
<< req->token;
std::cerr << std::endl;
#endif
toClear.push_back(req->token);
}
else if (now - req->reqTime > MAX_REQUEST_AGE)
{
#ifdef DATA_DEBUG
std::cerr << "RsGxsDataAccess::processrequests() Clearing Old Request Token: " << req->token;
std::cerr << std::endl;
#endif
toClear.push_back(req->token);
}
ok = getGroupSummary(gmr);
}
else if((gdr = dynamic_cast<GroupDataReq*>(req)) != NULL)
{
ok = getGroupData(gdr);
}
else if((gir = dynamic_cast<GroupIdReq*>(req)) != NULL)
{
ok = getGroupList(gir);
}
else if((mmr = dynamic_cast<MsgMetaReq*>(req)) != NULL)
{
ok = getMsgSummary(mmr);
}
else if((mdr = dynamic_cast<MsgDataReq*>(req)) != NULL)
{
ok = getMsgData(mdr);
}
else if((mir = dynamic_cast<MsgIdReq*>(req)) != NULL)
{
ok = getMsgList(mir);
}
else if((mri = dynamic_cast<MsgRelatedInfoReq*>(req)) != NULL)
{
ok = getMsgRelatedInfo(mri);
}
else if((gsr = dynamic_cast<GroupStatisticRequest*>(req)) != NULL)
{
ok = getGroupStatistic(gsr);
}
else if((ssr = dynamic_cast<ServiceStatisticRequest*>(req)) != NULL)
{
ok = getServiceStatistic(ssr);
}
else
{
std::cerr << "RsGxsDataAccess::processRequests() Failed to process request, token: "
<< req->token << std::endl;
}
} // END OF MUTEX.
for(cit = toClear.begin(); cit != toClear.end(); ++cit)
{
clearRequest(*cit);
{
RsStackMutex stack(mDataMutex); /******* LOCKED *******/
if (req->status == GXS_REQUEST_V2_STATUS_PARTIAL)
{
req->status = ok ? GXS_REQUEST_V2_STATUS_COMPLETE : GXS_REQUEST_V2_STATUS_FAILED;
}
} // END OF MUTEX.
}
return;
}
bool RsGxsDataAccess::getGroupStatistic(const uint32_t &token, GxsGroupStatistic &grpStatistic)
@ -1719,7 +1761,7 @@ bool RsGxsDataAccess::checkRequestStatus(const uint32_t& token,
GxsRequest* req = locked_retrieveRequest(token);
if(req == NULL)
if (req == NULL || req->status == GXS_REQUEST_V2_STATUS_CANCELLED)
return false;
anstype = req->ansType;

View File

@ -121,6 +121,7 @@ public:
static const uint8_t GXS_REQUEST_V2_STATUS_FINISHED_INCOMPLETE;
static const uint8_t GXS_REQUEST_V2_STATUS_COMPLETE;
static const uint8_t GXS_REQUEST_V2_STATUS_DONE; // ONCE ALL DATA RETRIEVED.
static const uint8_t GXS_REQUEST_V2_STATUS_CANCELLED;
public: