started to re-write the getMsgMeta and Ids in gxsdataaccess for better efficiency. Lastest msgs only net yet functional

This commit is contained in:
csoler 2020-04-09 22:55:13 +02:00
parent b0e61376f1
commit 4e66455ac0
No known key found for this signature in database
GPG Key ID: 7BCA522266C0804C
3 changed files with 126 additions and 107 deletions

View File

@ -1193,7 +1193,7 @@ bool RsGenExchange::getGroupList(const uint32_t &token, std::list<RsGxsGroupId>
bool RsGenExchange::getMsgList(const uint32_t &token,
GxsMsgIdResult &msgIds)
{
return mDataAccess->getMsgList(token, msgIds);
return mDataAccess->getMsgIdList(token, msgIds);
}
bool RsGenExchange::getMsgRelatedList(const uint32_t &token, MsgRelatedIdResult &msgIds)

View File

@ -603,7 +603,7 @@ bool RsGxsDataAccess::getMsgRelatedList(const uint32_t &token, MsgRelatedIdResul
return true;
}
bool RsGxsDataAccess::getMsgList(const uint32_t& token, GxsMsgIdResult& msgIds)
bool RsGxsDataAccess::getMsgIdList(const uint32_t& token, GxsMsgIdResult& msgIds)
{
RsStackMutex stack(mDataMutex);
@ -805,7 +805,7 @@ void RsGxsDataAccess::processRequests()
}
else if((mir = dynamic_cast<MsgIdReq*>(req)) != NULL)
{
ok = getMsgList(mir);
ok = getMsgIdList(mir);
}
else if((mri = dynamic_cast<MsgRelatedInfoReq*>(req)) != NULL)
{
@ -961,7 +961,7 @@ bool RsGxsDataAccess::getMsgData(MsgDataReq* req)
const RsTokReqOptions& opts(req->Options);
// filter based on options
getMsgList(req->mMsgIds, opts, msgIdOut);
getMsgIdList(req->mMsgIds, opts, msgIdOut);
// If the list is empty because of filtering do not retrieve from DB
if((opts.mMsgFlagMask || opts.mStatusMask) && msgIdOut.empty())
@ -979,24 +979,21 @@ bool RsGxsDataAccess::getMsgSummary(MsgMetaReq* req)
const RsTokReqOptions& opts(req->Options);
// filter based on options
getMsgList(req->mMsgIds, opts, msgIdOut);
getMsgMetaDataList(req->mMsgIds, opts, req->mMsgMetaData);
// If the list is empty because of filtering do not retrieve from DB
if((opts.mMsgFlagMask || opts.mStatusMask) && msgIdOut.empty())
return true;
mDataStore->retrieveGxsMsgMetaData(msgIdOut, req->mMsgMetaData);
// // If the list is empty because of filtering do not retrieve from DB
// if((opts.mMsgFlagMask || opts.mStatusMask) && msgIdOut.empty())
// return true;
//
// mDataStore->retrieveGxsMsgMetaData(msgIdOut, req->mMsgMetaData);
return true;
}
bool RsGxsDataAccess::getMsgList(
const GxsMsgReq& msgIds, const RsTokReqOptions& opts,
GxsMsgReq& msgIdsOut )
bool RsGxsDataAccess::getMsgMetaDataList( const GxsMsgReq& msgIds, const RsTokReqOptions& opts, GxsMsgMetaResult& result )
{
GxsMsgMetaResult result;
// First get all message metas, then filter out the ones we want to keep.
result.clear();
mDataStore->retrieveGxsMsgMetaData(msgIds, result);
/* CASEs this handles.
@ -1041,123 +1038,136 @@ bool RsGxsDataAccess::getMsgList(
}
GxsMsgMetaResult::iterator meta_it;
MsgMetaFilter metaFilter;
//MsgMetaFilter metaFilter;
for(meta_it = result.begin(); meta_it != result.end(); ++meta_it)
{
const RsGxsGroupId& grpId = meta_it->first;
metaFilter[grpId] = std::map<RsGxsMessageId, RsGxsMsgMetaData*>();
//auto& filter( metaFilter[grpId] ); // does the initialization of metaFilter[grpId] and avoids further O(log(n)) calls
std::vector<RsGxsMsgMetaData*>& metaV = meta_it->second;
const std::vector<RsGxsMsgMetaData*>& metaV = meta_it->second;
if (onlyLatestMsgs) // THIS ONE IS HARD -> LOTS OF COMP.
{
std::vector<RsGxsMsgMetaData*>::const_iterator vit = metaV.begin();
#ifdef TODO
// RUN THROUGH ALL MSGS... in map origId -> TS.
std::map<RsGxsMessageId, std::pair<RsGxsMessageId, rstime_t> > origMsgTs;
std::map<RsGxsMessageId, std::pair<RsGxsMessageId, rstime_t> >::iterator oit;
for(; vit != metaV.end(); ++vit)
for(uint32_t i=0;i<metaV.size();++i)
{
RsGxsMsgMetaData* msgMeta = *vit;
RsGxsMsgMetaData* msgMeta = metaV[i];
/* if we are grabbing thread Head... then parentId == empty. */
if (onlyThreadHeadMsgs)
if (onlyThreadHeadMsgs && !(msgMeta->mParentId.isNull()))
{
if (!(msgMeta->mParentId.isNull()))
delete msgMeta;
metaV[i] = nullptr;
continue;
}
auto oit = origMsgTs.find(msgMeta->mOrigMsgId);
bool addMsg = false;
if (oit != origMsgTs.end())
{
if(oit->second.second > msgMeta->mPublishTs)
{
continue;
#ifdef DATA_DEBUG
std::cerr << "RsGxsDataAccess::getMsgList() Found New OrigMsgId: ";
std::cerr << msgMeta->mOrigMsgId;
std::cerr << " MsgId: " << msgMeta->mMsgId;
std::cerr << " TS: " << msgMeta->mPublishTs;
std::cerr << std::endl;
#endif
origMsgTs[msgMeta->mOrigMsgId] = std::make_pair(msgMeta->mMsgId, msgMeta->mPublishTs); // add as latest. (overwriting if necessary)
}
}
oit = origMsgTs.find(msgMeta->mOrigMsgId);
bool addMsg = false;
if (oit == origMsgTs.end())
else
{
#ifdef DATA_DEBUG
std::cerr << "RsGxsDataAccess::getMsgList() Found New OrigMsgId: ";
std::cerr << msgMeta->mOrigMsgId;
std::cerr << " MsgId: " << msgMeta->mMsgId;
std::cerr << " TS: " << msgMeta->mPublishTs;
std::cerr << std::endl;
#endif
addMsg = true;
}
// check timestamps.
else if (oit->second.second < msgMeta->mPublishTs)
{
#ifdef DATA_DEBUG
std::cerr << "RsGxsDataAccess::getMsgList() Found Later Msg. OrigMsgId: ";
std::cerr << msgMeta->mOrigMsgId;
std::cerr << " MsgId: " << msgMeta->mMsgId;
std::cerr << " TS: " << msgMeta->mPublishTs;
#endif
addMsg = true;
delete msgMeta;
metaV[i] = nullptr;
continue;
}
if (addMsg)
{
// add as latest. (overwriting if necessary)
origMsgTs[msgMeta->mOrigMsgId] = std::make_pair(msgMeta->mMsgId, msgMeta->mPublishTs);
metaFilter[grpId].insert(std::make_pair(msgMeta->mMsgId, msgMeta));
}
}
// Add the discovered Latest Msgs.
for(oit = origMsgTs.begin(); oit != origMsgTs.end(); ++oit)
for(auto oit = origMsgTs.begin(); oit != origMsgTs.end(); ++oit)
{
msgIdsOut[grpId].insert(oit->second.first);
msgIdsOut[grpId].insert(oit->second.first);
}
#endif
}
else // ALL OTHER CASES.
{
std::vector<RsGxsMsgMetaData*>::const_iterator vit = metaV.begin();
for(; vit != metaV.end(); ++vit)
for(uint32_t i=0;i<metaV.size();++i)
{
RsGxsMsgMetaData* msgMeta = *vit;
RsGxsMsgMetaData* msgMeta = metaV[i];
bool add = false;
/* if we are grabbing thread Head... then parentId == empty. */
if (onlyThreadHeadMsgs)
if (onlyThreadHeadMsgs && !msgMeta->mParentId.isNull())
{
if (!(msgMeta->mParentId.isNull()))
{
continue;
}
delete msgMeta;
metaV[i] = nullptr;
continue;
}
if (onlyOrigMsgs)
{
if (msgMeta->mMsgId == msgMeta->mOrigMsgId)
{
add = true;
}
if (onlyOrigMsgs && !msgMeta->mOrigMsgId.isNull() && msgMeta->mMsgId != msgMeta->mOrigMsgId)
{
delete msgMeta;
metaV[i] = nullptr;
continue;
}
else
{
add = true;
}
if (add)
{
msgIdsOut[grpId].insert(msgMeta->mMsgId);
metaFilter[grpId].insert(std::make_pair(msgMeta->mMsgId, msgMeta));
}
}
}
}
filterMsgList(msgIdsOut, opts, metaFilter);
// collapse results while keeping the order, eliminating empty slots
metaFilter.clear();
for(auto it(result.begin());it!=result.end();++it)
{
uint32_t j=0; // j is the end of the cleaned-up tab, at the first available place
for(uint32_t i=0;i<it->second.size();++i) // i is the index in the tab possibly containing nullptr's
if(it->second[i] != nullptr)
{
it->second[j] = it->second[i]; // move the pointer to the first available place
++j;
}
it->second.resize(j); // normally all pointers have been moved forward so there is nothing to delete here.
}
// filterMsgIdList(msgIdsOut, opts, metaFilter); // this call is absurd: we already have in metaFilter the content we want.
//metaFilter.clear();
// delete meta data
//cleanseMsgMetaMap(result);
return true;
}
bool RsGxsDataAccess::getMsgIdList( const GxsMsgReq& msgIds, const RsTokReqOptions& opts, GxsMsgReq& msgIdsOut )
{
GxsMsgMetaResult result;
getMsgMetaDataList( msgIdsOut, opts, result );
// extract MessageIds
msgIdsOut.clear();
for(auto it(result.begin());it!=result.end();++it)
{
auto& id_set(msgIdsOut[it->first]);
for(uint32_t i=0;i<it->second.size();++i)
id_set.insert(it->second[i]->mMsgId);
}
// delete meta data
cleanseMsgMetaMap(result);
@ -1441,7 +1451,7 @@ bool RsGxsDataAccess::getMsgRelatedInfo(MsgRelatedInfoReq *req)
GxsMsgIdResult filteredOutMsgIds;
filteredOutMsgIds[grpId] = outMsgIds;
filterMsgList(filteredOutMsgIds, opts, filterMap);
filterMsgIdList(filteredOutMsgIds, opts, filterMap);
if(!filteredOutMsgIds[grpId].empty())
{
@ -1571,7 +1581,7 @@ bool RsGxsDataAccess::getServiceStatistic(ServiceStatisticRequest *req)
return true;
}
bool RsGxsDataAccess::getMsgList(MsgIdReq* req)
bool RsGxsDataAccess::getMsgIdList(MsgIdReq* req)
{
GxsMsgMetaResult result;
@ -1599,7 +1609,7 @@ bool RsGxsDataAccess::getMsgList(MsgIdReq* req)
GxsMsgReq msgIdOut;
// filter based on options
getMsgList(req->mMsgIdResult, req->Options, msgIdOut);
getMsgIdList(req->mMsgIdResult, req->Options, msgIdOut);
req->mMsgIdResult = msgIdOut;
return true;
@ -1624,12 +1634,9 @@ void RsGxsDataAccess::cleanseMsgMetaMap(GxsMsgMetaResult& result)
return;
}
void RsGxsDataAccess::filterMsgList(
GxsMsgIdResult& resultsMap, const RsTokReqOptions& opts,
const MsgMetaFilter& msgMetas ) const
void RsGxsDataAccess::filterMsgIdList( GxsMsgIdResult& resultsMap, const RsTokReqOptions& opts, const MsgMetaFilter& msgMetas ) const
{
for( GxsMsgIdResult::iterator grpIt = resultsMap.begin();
grpIt != resultsMap.end(); ++grpIt )
for( GxsMsgIdResult::iterator grpIt = resultsMap.begin(); grpIt != resultsMap.end(); ++grpIt )
{
const RsGxsGroupId& groupId(grpIt->first);
std::set<RsGxsMessageId>& msgsIdSet(grpIt->second);
@ -1642,8 +1649,7 @@ void RsGxsDataAccess::filterMsgList(
<< std::endl;
#endif
for( std::set<RsGxsMessageId>::iterator msgIdIt = msgsIdSet.begin();
msgIdIt != msgsIdSet.end(); )
for( std::set<RsGxsMessageId>::iterator msgIdIt = msgsIdSet.begin(); msgIdIt != msgsIdSet.end(); )
{
const RsGxsMessageId& msgId(*msgIdIt);
const std::map<RsGxsMessageId, RsGxsMsgMetaData*>& msgsMetaMap =
@ -1657,8 +1663,10 @@ void RsGxsDataAccess::filterMsgList(
keep = checkMsgFilter(opts, msgsMetaMapIt->second);
}
if(keep) ++msgIdIt;
else msgIdIt = msgsIdSet.erase(msgIdIt);
if(keep)
++msgIdIt;
else
msgIdIt = msgsIdSet.erase(msgIdIt);
}
#ifdef DATA_DEBUG

View File

@ -204,7 +204,8 @@ public:
* @param token request token to be redeemed
* @param msgIds
*/
bool getMsgList(const uint32_t &token, GxsMsgIdResult &msgIds);
bool getMsgIdList(const uint32_t &token, GxsMsgIdResult &msgIds);
/*!
* Retrieve msg list for a given token for message related info
@ -382,8 +383,18 @@ private:
* @param req
* @return false if unsuccessful, true otherwise
*/
bool getMsgList(MsgIdReq* req);
bool getMsgIdList(MsgIdReq* req);
/*!
* Attempts to retrieve msg Meta list from data store
* Computationally/CPU-Bandwidth expensive
*
* @param msgIds List of message Ids for the Message Metas to retrieve
* @param opts GxsRequest options
* @param result Map of Meta information for messages
*
*/
bool getMsgMetaDataList( const GxsMsgReq& msgIds, const RsTokReqOptions& opts, GxsMsgMetaResult& result );
/*!
* Attempts to retrieve group meta data from data store
@ -449,7 +460,7 @@ private:
* @param opts the request options set by user
* @param meta The accompanying meta information for msg, ids
*/
void filterMsgList(GxsMsgIdResult& msgIds, const RsTokReqOptions& opts, const MsgMetaFilter& meta) const;
void filterMsgIdList(GxsMsgIdResult& msgIds, const RsTokReqOptions& opts, const MsgMetaFilter& meta) const;
/*!
* This filter msgs based of options supplied (at the moment just status masks)
@ -486,7 +497,7 @@ private:
* @param opts the options used to parameterise the id filter
* @param msgIdsOut the left overs ids after filter is applied to msgIds
*/
bool getMsgList(const GxsMsgReq& msgIds, const RsTokReqOptions& opts, GxsMsgReq& msgIdsOut);
bool getMsgIdList(const GxsMsgReq& msgIds, const RsTokReqOptions& opts, GxsMsgReq& msgIdsOut);
private:
bool locked_clearRequest(const uint32_t &token);