mirror of
https://github.com/RetroShare/RetroShare.git
synced 2024-10-01 02:35:48 -04:00
Merge pull request #1979 from csoler/v0.6-GxsGroup
[WIP] implementing a cache for MsgMeta
This commit is contained in:
commit
a18009413d
@ -110,8 +110,6 @@ const std::string RsGeneralDataService::MSG_META_STATUS = KEY_MSG_STATUS;
|
||||
|
||||
const uint32_t RsGeneralDataService::GXS_MAX_ITEM_SIZE = 1572864; // 1.5 Mbytes
|
||||
|
||||
static const uint32_t CACHE_ENTRY_GRACE_PERIOD = 600 ; // 10 minutes
|
||||
|
||||
static int addColumn(std::list<std::string> &list, const std::string &attribute)
|
||||
{
|
||||
list.push_back(attribute);
|
||||
@ -123,7 +121,6 @@ RsDataService::RsDataService(const std::string &serviceDir, const std::string &d
|
||||
: RsGeneralDataService(), mDbMutex("RsDataService"), mServiceDir(serviceDir), mDbName(dbName), mDbPath(mServiceDir + "/" + dbName), mServType(serviceType), mDb(NULL)
|
||||
{
|
||||
bool isNewDatabase = !RsDirUtil::fileExists(mDbPath);
|
||||
mGrpMetaDataCache_ContainsAllDatabase = false ;
|
||||
|
||||
mDb = new RetroDb(mDbPath, RetroDb::OPEN_READWRITE_CREATE, key);
|
||||
|
||||
@ -488,8 +485,7 @@ bool RsDataService::finishReleaseUpdate(int release, bool result)
|
||||
RsGxsGrpMetaData* RsDataService::locked_getGrpMeta(RetroCursor &c, int colOffset,bool use_cache)
|
||||
{
|
||||
#ifdef RS_DATA_SERVICE_DEBUG
|
||||
std::cerr << "RsDataService::locked_getGrpMeta()";
|
||||
std::cerr << std::endl;
|
||||
std::cerr << "RsDataService::locked_getGrpMeta()" << std::endl;
|
||||
#endif
|
||||
|
||||
bool ok = true;
|
||||
@ -507,20 +503,13 @@ RsGxsGrpMetaData* RsDataService::locked_getGrpMeta(RetroCursor &c, int colOffset
|
||||
RsGxsGroupId grpId(tempId) ;
|
||||
|
||||
if(use_cache)
|
||||
{
|
||||
auto it = mGrpMetaDataCache.find(grpId) ;
|
||||
|
||||
if(it != mGrpMetaDataCache.end())
|
||||
grpMeta = it->second ;
|
||||
else
|
||||
{
|
||||
grpMeta = new RsGxsGrpMetaData();
|
||||
mGrpMetaDataCache[grpId] = grpMeta ;
|
||||
}
|
||||
}
|
||||
grpMeta = mGrpMetaDataCache.getOrCreateMeta(grpId);
|
||||
else
|
||||
grpMeta = new RsGxsGrpMetaData();
|
||||
|
||||
if(!grpMeta->mGroupId.isNull()) // the grpMeta is already initialized because it comes from the cache
|
||||
return grpMeta;
|
||||
|
||||
grpMeta->mGroupId = RsGxsGroupId(tempId);
|
||||
c.getString(mColGrpMeta_NxsIdentity + colOffset, tempId);
|
||||
grpMeta->mAuthorId = RsGxsId(tempId);
|
||||
@ -653,24 +642,38 @@ RsNxsGrp* RsDataService::locked_getGroup(RetroCursor &c)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
RsGxsMsgMetaData* RsDataService::locked_getMsgMeta(RetroCursor &c, int colOffset)
|
||||
RsGxsMsgMetaData* RsDataService::locked_getMsgMeta(RetroCursor &c, int colOffset,bool use_cache)
|
||||
{
|
||||
|
||||
RsGxsMsgMetaData* msgMeta = new RsGxsMsgMetaData();
|
||||
|
||||
bool ok = true;
|
||||
uint32_t data_len = 0,
|
||||
offset = 0;
|
||||
char* data = NULL;
|
||||
|
||||
RsGxsGroupId group_id;
|
||||
RsGxsMessageId msg_id;
|
||||
|
||||
std::string gId;
|
||||
c.getString(mColMsgMeta_GrpId + colOffset, gId);
|
||||
msgMeta->mGroupId = RsGxsGroupId(gId);
|
||||
group_id = RsGxsGroupId(gId);
|
||||
std::string temp;
|
||||
c.getString(mColMsgMeta_MsgId + colOffset, temp);
|
||||
msgMeta->mMsgId = RsGxsMessageId(temp);
|
||||
msg_id = RsGxsMessageId(temp);
|
||||
// without these, a msg is meaningless
|
||||
ok &= (!msgMeta->mGroupId.isNull()) && (!msgMeta->mMsgId.isNull());
|
||||
ok &= (!group_id.isNull()) && (!msg_id.isNull());
|
||||
|
||||
RsGxsMsgMetaData* msgMeta = nullptr;
|
||||
|
||||
if(use_cache)
|
||||
msgMeta = mMsgMetaDataCache[group_id].getOrCreateMeta(msg_id);
|
||||
else
|
||||
msgMeta = new RsGxsMsgMetaData();
|
||||
|
||||
if(!msgMeta->mGroupId.isNull()) // we cannot do that because the cursor needs to advance. Is there a method to skip some data in the db?
|
||||
return msgMeta;
|
||||
|
||||
msgMeta->mGroupId = group_id;
|
||||
msgMeta->mMsgId = msg_id;
|
||||
|
||||
c.getString(mColMsgMeta_OrigMsgId + colOffset, temp);
|
||||
msgMeta->mOrigMsgId = RsGxsMessageId(temp);
|
||||
@ -704,7 +707,7 @@ RsGxsMsgMetaData* RsDataService::locked_getMsgMeta(RetroCursor &c, int colOffset
|
||||
|
||||
if(ok)
|
||||
return msgMeta;
|
||||
else
|
||||
else if(!use_cache)
|
||||
delete msgMeta;
|
||||
|
||||
return NULL;
|
||||
@ -834,7 +837,8 @@ int RsDataService::storeMessage(const std::list<RsNxsMsg*>& msg)
|
||||
|
||||
// This is needed so that mLastPost is correctly updated in the group meta when it is re-loaded.
|
||||
|
||||
locked_clearGrpMetaCache(msgMetaPtr->mGroupId);
|
||||
mGrpMetaDataCache.clear(msgMetaPtr->mGroupId);
|
||||
mMsgMetaDataCache[msgMetaPtr->mGroupId].updateMeta(msgMetaPtr->mMsgId,*msgMetaPtr);
|
||||
}
|
||||
|
||||
// finish transaction
|
||||
@ -926,7 +930,7 @@ int RsDataService::storeGroup(const std::list<RsNxsGrp*>& grp)
|
||||
cv.put(KEY_GRP_STATUS, (int32_t)grpMetaPtr->mGroupStatus);
|
||||
cv.put(KEY_GRP_LAST_POST, (int32_t)grpMetaPtr->mLastPost);
|
||||
|
||||
locked_updateGrpMetaCache(*grpMetaPtr);
|
||||
mGrpMetaDataCache.updateMeta(grpMetaPtr->mGroupId,*grpMetaPtr);
|
||||
|
||||
if (!mDb->sqlInsert(GRP_TABLE_NAME, "", cv))
|
||||
{
|
||||
@ -942,54 +946,6 @@ int RsDataService::storeGroup(const std::list<RsNxsGrp*>& grp)
|
||||
return ret;
|
||||
}
|
||||
|
||||
void RsDataService::locked_updateGrpMetaCache(const RsGxsGrpMetaData& meta)
|
||||
{
|
||||
auto it = mGrpMetaDataCache.find(meta.mGroupId) ;
|
||||
|
||||
if(it != mGrpMetaDataCache.end())
|
||||
*(it->second) = meta ;
|
||||
else
|
||||
mGrpMetaDataCache[meta.mGroupId] = new RsGxsGrpMetaData(meta) ;
|
||||
}
|
||||
|
||||
void RsDataService::locked_clearGrpMetaCache(const RsGxsGroupId& gid)
|
||||
{
|
||||
rstime_t now = time(NULL) ;
|
||||
auto it = mGrpMetaDataCache.find(gid) ;
|
||||
|
||||
// We dont actually delete the item, because it might be used by a calling client.
|
||||
// In this case, the memory will not be used for long, so we keep it into a list for a safe amount
|
||||
// of time and delete it later. Using smart pointers here would be more elegant, but that would need
|
||||
// to be implemented thread safe, which is difficult in this case.
|
||||
|
||||
if(it != mGrpMetaDataCache.end())
|
||||
{
|
||||
#ifdef RS_DATA_SERVICE_DEBUG
|
||||
std::cerr << "(II) moving database cache entry " << (void*)(*it).second << " to dead list." << std::endl;
|
||||
#endif
|
||||
|
||||
mOldCachedItems.push_back(std::make_pair(now,it->second)) ;
|
||||
|
||||
mGrpMetaDataCache.erase(it) ;
|
||||
mGrpMetaDataCache_ContainsAllDatabase = false;
|
||||
}
|
||||
|
||||
// We also take that opportunity to delete old entries.
|
||||
|
||||
auto it2(mOldCachedItems.begin());
|
||||
|
||||
while(it2!=mOldCachedItems.end() && (*it2).first + CACHE_ENTRY_GRACE_PERIOD < now)
|
||||
{
|
||||
#ifdef RS_DATA_SERVICE_DEBUG
|
||||
std::cerr << "(II) deleting old GXS database cache entry " << (void*)(*it2).second << ", " << now - (*it2).first << " seconds old." << std::endl;
|
||||
#endif
|
||||
|
||||
delete (*it2).second ;
|
||||
it2 = mOldCachedItems.erase(it2) ;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
int RsDataService::updateGroup(const std::list<RsNxsGrp *> &grp)
|
||||
{
|
||||
|
||||
@ -1058,7 +1014,7 @@ int RsDataService::updateGroup(const std::list<RsNxsGrp *> &grp)
|
||||
|
||||
mDb->sqlUpdate(GRP_TABLE_NAME, "grpId='" + grpPtr->grpId.toStdString() + "'", cv);
|
||||
|
||||
locked_updateGrpMetaCache(*grpMetaPtr);
|
||||
mGrpMetaDataCache.updateMeta(grpMetaPtr->mGroupId,*grpMetaPtr);
|
||||
}
|
||||
// finish transaction
|
||||
bool ret = mDb->commitTransaction();
|
||||
@ -1275,7 +1231,7 @@ void RsDataService::locked_retrieveMessages(RetroCursor *c, std::vector<RsNxsMsg
|
||||
|
||||
if(m){
|
||||
if (metaOffset) {
|
||||
m->metaData = locked_getMsgMeta(*c, metaOffset);
|
||||
m->metaData = locked_getMsgMeta(*c, metaOffset,false);
|
||||
}
|
||||
msgs.push_back(m);
|
||||
}
|
||||
@ -1285,7 +1241,7 @@ void RsDataService::locked_retrieveMessages(RetroCursor *c, std::vector<RsNxsMsg
|
||||
return;
|
||||
}
|
||||
|
||||
int RsDataService::retrieveGxsMsgMetaData(const GxsMsgReq& reqIds, GxsMsgMetaResult &msgMeta)
|
||||
int RsDataService::retrieveGxsMsgMetaData(const GxsMsgReq& reqIds, GxsMsgMetaResult& msgMeta)
|
||||
{
|
||||
RsStackMutex stack(mDbMutex);
|
||||
|
||||
@ -1294,52 +1250,65 @@ int RsDataService::retrieveGxsMsgMetaData(const GxsMsgReq& reqIds, GxsMsgMetaRes
|
||||
int resultCount = 0;
|
||||
#endif
|
||||
|
||||
GxsMsgReq::const_iterator mit = reqIds.begin();
|
||||
|
||||
for(; mit != reqIds.end(); ++mit)
|
||||
for(auto mit(reqIds.begin()); mit != reqIds.end(); ++mit)
|
||||
{
|
||||
|
||||
const RsGxsGroupId& grpId = mit->first;
|
||||
const std::set<RsGxsMessageId>& msgIdV = mit->second;
|
||||
|
||||
// if vector empty then request all messages
|
||||
const std::set<RsGxsMessageId>& msgIdV = mit->second;
|
||||
std::vector<RsGxsMsgMetaData*> metaSet;
|
||||
|
||||
if(msgIdV.empty()){
|
||||
RetroCursor* c = mDb->sqlQuery(MSG_TABLE_NAME, mMsgMetaColumns, KEY_GRP_ID+ "='" + grpId.toStdString() + "'", "");
|
||||
t_MetaDataCache<RsGxsMessageId,RsGxsMsgMetaData>& cache(mMsgMetaDataCache[grpId]);
|
||||
|
||||
if (c)
|
||||
{
|
||||
locked_retrieveMsgMeta(c, metaSet);
|
||||
if(msgIdV.empty())
|
||||
{
|
||||
if(cache.isCacheUpToDate())
|
||||
cache.getFullMetaList(msgMeta[grpId]);
|
||||
else
|
||||
{
|
||||
RetroCursor* c = mDb->sqlQuery(MSG_TABLE_NAME, mMsgMetaColumns, KEY_GRP_ID+ "='" + grpId.toStdString() + "'", "");
|
||||
|
||||
if (c)
|
||||
{
|
||||
locked_retrieveMsgMetaList(c, msgMeta[grpId]);
|
||||
cache.setCacheUpToDate(true);
|
||||
}
|
||||
delete c;
|
||||
}
|
||||
#ifdef RS_DATA_SERVICE_DEBUG_CACHE
|
||||
std::cerr << mDbName << ": Retrieving (all) Msg metadata grpId=" << grpId << ", " << std::dec << metaSet.size() << " messages" << std::endl;
|
||||
std::cerr << mDbName << ": Retrieving (all) Msg metadata grpId=" << grpId << ", " << std::dec << metaSet.size() << " messages" << std::endl;
|
||||
#endif
|
||||
}
|
||||
}else{
|
||||
|
||||
// request each grp
|
||||
std::set<RsGxsMessageId>::const_iterator sit = msgIdV.begin();
|
||||
|
||||
for(; sit!=msgIdV.end(); ++sit){
|
||||
const RsGxsMessageId& msgId = *sit;
|
||||
RetroCursor* c = mDb->sqlQuery(MSG_TABLE_NAME, mMsgMetaColumns, KEY_GRP_ID+ "='" + grpId.toStdString()
|
||||
+ "' AND " + KEY_MSG_ID + "='" + msgId.toStdString() + "'", "");
|
||||
|
||||
if (c)
|
||||
{
|
||||
locked_retrieveMsgMeta(c, metaSet);
|
||||
#ifdef RS_DATA_SERVICE_DEBUG_CACHE
|
||||
std::cerr << mDbName << ": Retrieving Msg metadata grpId=" << grpId << ", " << std::dec << metaSet.size() << " messages" << std::endl;
|
||||
#endif
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// request each msg meta
|
||||
auto& metaSet(msgMeta[grpId]);
|
||||
|
||||
#ifdef RS_DATA_SERVICE_DEBUG_TIME
|
||||
resultCount += metaSet.size();
|
||||
for(auto sit(msgIdV.begin()); sit!=msgIdV.end(); ++sit)
|
||||
{
|
||||
const RsGxsMessageId& msgId = *sit;
|
||||
|
||||
RsGxsMsgMetaData *meta = cache.getMeta(msgId);
|
||||
|
||||
if(meta)
|
||||
metaSet.push_back(meta);
|
||||
else
|
||||
{
|
||||
RetroCursor* c = mDb->sqlQuery(MSG_TABLE_NAME, mMsgMetaColumns, KEY_GRP_ID+ "='" + grpId.toStdString() + "' AND " + KEY_MSG_ID + "='" + msgId.toStdString() + "'", "");
|
||||
|
||||
c->moveToFirst();
|
||||
RsGxsMsgMetaData* meta = locked_getMsgMeta(*c, 0,true);
|
||||
|
||||
if(meta)
|
||||
metaSet.push_back(meta);
|
||||
|
||||
delete c;
|
||||
}
|
||||
}
|
||||
#ifdef RS_DATA_SERVICE_DEBUG_CACHE
|
||||
std::cerr << mDbName << ": Retrieving Msg metadata grpId=" << grpId << ", " << std::dec << metaSet.size() << " messages" << std::endl;
|
||||
#endif
|
||||
|
||||
msgMeta[grpId] = metaSet;
|
||||
}
|
||||
}
|
||||
|
||||
#ifdef RS_DATA_SERVICE_DEBUG_TIME
|
||||
@ -1350,22 +1319,43 @@ int RsDataService::retrieveGxsMsgMetaData(const GxsMsgReq& reqIds, GxsMsgMetaRes
|
||||
return 1;
|
||||
}
|
||||
|
||||
void RsDataService::locked_retrieveMsgMeta(RetroCursor *c, std::vector<RsGxsMsgMetaData *> &msgMeta)
|
||||
void RsDataService::locked_retrieveGrpMetaList(RetroCursor *c, std::map<RsGxsGroupId,RsGxsGrpMetaData *>& grpMeta)
|
||||
{
|
||||
if(!c)
|
||||
{
|
||||
RsErr() << __PRETTY_FUNCTION__ << ": attempt to retrieve Group Meta data from the DB with null cursor!" << std::endl;
|
||||
return;
|
||||
}
|
||||
|
||||
if(c)
|
||||
{
|
||||
bool valid = c->moveToFirst();
|
||||
while(valid){
|
||||
RsGxsMsgMetaData* m = locked_getMsgMeta(*c, 0);
|
||||
bool valid = c->moveToFirst();
|
||||
|
||||
if(m != NULL)
|
||||
msgMeta.push_back(m);
|
||||
while(valid)
|
||||
{
|
||||
RsGxsGrpMetaData* m = locked_getGrpMeta(*c, 0,true);
|
||||
|
||||
valid = c->moveToNext();
|
||||
}
|
||||
delete c;
|
||||
}
|
||||
if(m)
|
||||
grpMeta[m->mGroupId] = m;
|
||||
|
||||
valid = c->moveToNext();
|
||||
}
|
||||
}
|
||||
void RsDataService::locked_retrieveMsgMetaList(RetroCursor *c, std::vector<const RsGxsMsgMetaData *>& msgMeta)
|
||||
{
|
||||
if(!c)
|
||||
{
|
||||
RsErr() << __PRETTY_FUNCTION__ << ": attempt to retrieve Msg Meta data from the DB with null cursor!" << std::endl;
|
||||
return;
|
||||
}
|
||||
|
||||
bool valid = c->moveToFirst();
|
||||
while(valid){
|
||||
const RsGxsMsgMetaData* m = locked_getMsgMeta(*c, 0,true);
|
||||
|
||||
if(m != NULL)
|
||||
msgMeta.push_back(m);
|
||||
|
||||
valid = c->moveToNext();
|
||||
}
|
||||
}
|
||||
|
||||
int RsDataService::retrieveGxsGrpMetaData(RsGxsGrpMetaTemporaryMap& grp)
|
||||
@ -1385,13 +1375,13 @@ int RsDataService::retrieveGxsGrpMetaData(RsGxsGrpMetaTemporaryMap& grp)
|
||||
|
||||
if(grp.empty())
|
||||
{
|
||||
if(mGrpMetaDataCache_ContainsAllDatabase) // grab all the stash from the cache, so as to avoid decryption costs.
|
||||
if(mGrpMetaDataCache.isCacheUpToDate()) // grab all the stash from the cache, so as to avoid decryption costs.
|
||||
{
|
||||
#ifdef RS_DATA_SERVICE_DEBUG_CACHE
|
||||
std::cerr << (void*)this << ": RsDataService::retrieveGxsGrpMetaData() retrieving all from cache!" << std::endl;
|
||||
#endif
|
||||
|
||||
grp = mGrpMetaDataCache ;
|
||||
mGrpMetaDataCache.getFullMetaList(grp) ;
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -1402,93 +1392,103 @@ int RsDataService::retrieveGxsGrpMetaData(RsGxsGrpMetaTemporaryMap& grp)
|
||||
|
||||
RetroCursor* c = mDb->sqlQuery(GRP_TABLE_NAME, mGrpMetaColumns, "", "");
|
||||
|
||||
if(c)
|
||||
if(c)
|
||||
{
|
||||
bool valid = c->moveToFirst();
|
||||
locked_retrieveGrpMetaList(c,grp);
|
||||
|
||||
while(valid)
|
||||
{
|
||||
RsGxsGrpMetaData* g = locked_getGrpMeta(*c, 0,true);
|
||||
|
||||
if(g)
|
||||
{
|
||||
grp[g->mGroupId] = g;
|
||||
#ifdef RS_DATA_SERVICE_DEBUG_CACHE
|
||||
std::cerr << (void *)this << " " << mDbName << ": Retrieving (all) Grp metadata grpId=" << g->mGroupId << std::endl;
|
||||
#endif
|
||||
}
|
||||
valid = c->moveToNext();
|
||||
|
||||
#ifdef RS_DATA_SERVICE_DEBUG_TIME
|
||||
++resultCount;
|
||||
#endif
|
||||
}
|
||||
delete c;
|
||||
mGrpMetaDataCache.setCacheUpToDate(true);
|
||||
}
|
||||
delete c;
|
||||
#ifdef RS_DATA_SERVICE_DEBUG_TIME
|
||||
resultCount += grp.size();
|
||||
#endif
|
||||
|
||||
mGrpMetaDataCache_ContainsAllDatabase = true ;
|
||||
// if(c)
|
||||
// {
|
||||
// bool valid = c->moveToFirst();
|
||||
//
|
||||
// while(valid)
|
||||
// {
|
||||
// RsGxsGrpMetaData* g = locked_getGrpMeta(*c, 0,true);
|
||||
//
|
||||
// if(g)
|
||||
// {
|
||||
// grp[g->mGroupId] = g;
|
||||
//#ifdef RS_DATA_SERVICE_DEBUG_CACHE
|
||||
// std::cerr << (void *)this << " " << mDbName << ": Retrieving (all) Grp metadata grpId=" << g->mGroupId << std::endl;
|
||||
//#endif
|
||||
// }
|
||||
// valid = c->moveToNext();
|
||||
//
|
||||
// }
|
||||
// delete c;
|
||||
// }
|
||||
}
|
||||
|
||||
}
|
||||
else
|
||||
{
|
||||
std::map<RsGxsGroupId, RsGxsGrpMetaData *>::iterator mit = grp.begin();
|
||||
{
|
||||
for(auto mit(grp.begin()); mit != grp.end(); ++mit)
|
||||
{
|
||||
RsGxsGrpMetaData *meta = mGrpMetaDataCache.getMeta(mit->first) ;
|
||||
|
||||
for(; mit != grp.end(); ++mit)
|
||||
{
|
||||
std::map<RsGxsGroupId, RsGxsGrpMetaData*>::const_iterator itt = mGrpMetaDataCache.find(mit->first) ;
|
||||
|
||||
if(itt != mGrpMetaDataCache.end())
|
||||
{
|
||||
if(meta)
|
||||
grp[mit->first] = meta;
|
||||
else
|
||||
{
|
||||
#ifdef RS_DATA_SERVICE_DEBUG_CACHE
|
||||
std::cerr << mDbName << ": Retrieving Grp metadata grpId=" << mit->first << " from cache!" << std::endl;
|
||||
#endif
|
||||
grp[mit->first] = itt->second ;
|
||||
}
|
||||
else
|
||||
{
|
||||
#ifdef RS_DATA_SERVICE_DEBUG_CACHE
|
||||
std::cerr << mDbName << ": Retrieving Grp metadata grpId=" << mit->first ;
|
||||
std::cerr << mDbName << ": Retrieving Grp metadata grpId=" << mit->first ;
|
||||
#endif
|
||||
|
||||
const RsGxsGroupId& grpId = mit->first;
|
||||
RetroCursor* c = mDb->sqlQuery(GRP_TABLE_NAME, mGrpMetaColumns, "grpId='" + grpId.toStdString() + "'", "");
|
||||
const RsGxsGroupId& grpId = mit->first;
|
||||
RetroCursor* c = mDb->sqlQuery(GRP_TABLE_NAME, mGrpMetaColumns, "grpId='" + grpId.toStdString() + "'", "");
|
||||
|
||||
if(c)
|
||||
{
|
||||
bool valid = c->moveToFirst();
|
||||
c->moveToFirst();
|
||||
RsGxsGrpMetaData* meta = locked_getGrpMeta(*c, 0,true);
|
||||
|
||||
#ifdef RS_DATA_SERVICE_DEBUG_CACHE
|
||||
if(!valid)
|
||||
std::cerr << " Empty query! GrpId " << grpId << " is not in database" << std::endl;
|
||||
#endif
|
||||
while(valid)
|
||||
{
|
||||
RsGxsGrpMetaData* g = locked_getGrpMeta(*c, 0,true);
|
||||
|
||||
if(g)
|
||||
{
|
||||
grp[g->mGroupId] = g;
|
||||
#ifdef RS_DATA_SERVICE_DEBUG_CACHE
|
||||
std::cerr << ". Got it. Updating cache." << std::endl;
|
||||
#endif
|
||||
}
|
||||
valid = c->moveToNext();
|
||||
if(meta)
|
||||
grp[mit->first] = meta;
|
||||
|
||||
#ifdef RS_DATA_SERVICE_DEBUG_TIME
|
||||
++resultCount;
|
||||
++resultCount;
|
||||
#endif
|
||||
}
|
||||
delete c;
|
||||
}
|
||||
#ifdef RS_DATA_SERVICE_DEBUG_CACHE
|
||||
else
|
||||
std::cerr << ". not found!" << std::endl;
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
delete c;
|
||||
|
||||
// if(c)
|
||||
// {
|
||||
// bool valid = c->moveToFirst();
|
||||
//
|
||||
//#ifdef RS_DATA_SERVICE_DEBUG_CACHE
|
||||
// if(!valid)
|
||||
// std::cerr << " Empty query! GrpId " << grpId << " is not in database" << std::endl;
|
||||
//#endif
|
||||
// while(valid)
|
||||
// {
|
||||
// RsGxsGrpMetaData* g = locked_getGrpMeta(*c, 0,true);
|
||||
//
|
||||
// if(g)
|
||||
// {
|
||||
// grp[g->mGroupId] = g;
|
||||
//#ifdef RS_DATA_SERVICE_DEBUG_CACHE
|
||||
// std::cerr << ". Got it. Updating cache." << std::endl;
|
||||
//#endif
|
||||
// }
|
||||
// valid = c->moveToNext();
|
||||
//
|
||||
//#ifdef RS_DATA_SERVICE_DEBUG_TIME
|
||||
// ++resultCount;
|
||||
//#endif
|
||||
// }
|
||||
// delete c;
|
||||
// }
|
||||
#ifdef RS_DATA_SERVICE_DEBUG_CACHE
|
||||
else
|
||||
std::cerr << ". not found!" << std::endl;
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#ifdef RS_DATA_SERVICE_DEBUG_TIME
|
||||
std::cerr << "RsDataService::retrieveGxsGrpMetaData() " << mDbName << ", Requests: " << requestedGroups << ", Results: " << resultCount << ", Time: " << timer.duration() << std::endl;
|
||||
@ -1525,35 +1525,37 @@ int RsDataService::resetDataStore()
|
||||
return 1;
|
||||
}
|
||||
|
||||
int RsDataService::updateGroupMetaData(GrpLocMetaData &meta)
|
||||
int RsDataService::updateGroupMetaData(const GrpLocMetaData& meta)
|
||||
{
|
||||
#ifdef RS_DATA_SERVICE_DEBUG_CACHE
|
||||
std::cerr << (void*)this << ": Updating Grp Meta data: grpId = " << meta.grpId << std::endl;
|
||||
#endif
|
||||
|
||||
RsStackMutex stack(mDbMutex);
|
||||
RsGxsGroupId& grpId = meta.grpId;
|
||||
const RsGxsGroupId& grpId = meta.grpId;
|
||||
|
||||
#ifdef RS_DATA_SERVICE_DEBUG_CACHE
|
||||
std::cerr << (void*)this << ": erasing old entry from cache." << std::endl;
|
||||
#endif
|
||||
|
||||
locked_clearGrpMetaCache(meta.grpId);
|
||||
mGrpMetaDataCache.clear(meta.grpId);
|
||||
|
||||
return mDb->sqlUpdate(GRP_TABLE_NAME, KEY_GRP_ID+ "='" + grpId.toStdString() + "'", meta.val) ? 1 : 0;
|
||||
}
|
||||
|
||||
int RsDataService::updateMessageMetaData(MsgLocMetaData &metaData)
|
||||
int RsDataService::updateMessageMetaData(const MsgLocMetaData& metaData)
|
||||
{
|
||||
#ifdef RS_DATA_SERVICE_DEBUG_CACHE
|
||||
std::cerr << (void*)this << ": Updating Msg Meta data: grpId = " << metaData.msgId.first << " msgId = " << metaData.msgId.second << std::endl;
|
||||
#endif
|
||||
|
||||
RsStackMutex stack(mDbMutex);
|
||||
RsGxsGroupId& grpId = metaData.msgId.first;
|
||||
RsGxsMessageId& msgId = metaData.msgId.second;
|
||||
return mDb->sqlUpdate(MSG_TABLE_NAME, KEY_GRP_ID+ "='" + grpId.toStdString()
|
||||
+ "' AND " + KEY_MSG_ID + "='" + msgId.toStdString() + "'", metaData.val) ? 1 : 0;
|
||||
const RsGxsGroupId& grpId = metaData.msgId.first;
|
||||
const RsGxsMessageId& msgId = metaData.msgId.second;
|
||||
|
||||
mMsgMetaDataCache[grpId].clear(msgId);
|
||||
|
||||
return mDb->sqlUpdate(MSG_TABLE_NAME, KEY_GRP_ID+ "='" + grpId.toStdString() + "' AND " + KEY_MSG_ID + "='" + msgId.toStdString() + "'", metaData.val) ? 1 : 0;
|
||||
}
|
||||
|
||||
int RsDataService::removeMsgs(const GxsMsgReq& msgIds)
|
||||
@ -1678,13 +1680,13 @@ bool RsDataService::locked_removeMessageEntries(const GxsMsgReq& msgIds)
|
||||
{
|
||||
const RsGxsGroupId& grpId = mit->first;
|
||||
const std::set<RsGxsMessageId>& msgsV = mit->second;
|
||||
std::set<RsGxsMessageId>::const_iterator vit = msgsV.begin();
|
||||
auto& cache(mMsgMetaDataCache[grpId]);
|
||||
|
||||
for(; vit != msgsV.end(); ++vit)
|
||||
for(auto& msgId:msgsV)
|
||||
{
|
||||
const RsGxsMessageId& msgId = *vit;
|
||||
mDb->sqlDelete(MSG_TABLE_NAME, KEY_GRP_ID+ "='" + grpId.toStdString()
|
||||
+ "' AND " + KEY_MSG_ID + "='" + msgId.toStdString() + "'", "");
|
||||
mDb->sqlDelete(MSG_TABLE_NAME, KEY_GRP_ID+ "='" + grpId.toStdString() + "' AND " + KEY_MSG_ID + "='" + msgId.toStdString() + "'", "");
|
||||
|
||||
cache.clear(msgId);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1698,23 +1700,18 @@ bool RsDataService::locked_removeGroupEntries(const std::vector<RsGxsGroupId>& g
|
||||
// start a transaction
|
||||
bool ret = mDb->beginTransaction();
|
||||
|
||||
std::vector<RsGxsGroupId>::const_iterator vit = grpIds.begin();
|
||||
|
||||
for(; vit != grpIds.end(); ++vit)
|
||||
for(auto grpId:grpIds)
|
||||
{
|
||||
|
||||
const RsGxsGroupId& grpId = *vit;
|
||||
mDb->sqlDelete(GRP_TABLE_NAME, KEY_GRP_ID+ "='" + grpId.toStdString() + "'", "");
|
||||
|
||||
// also remove the group meta from cache.
|
||||
locked_clearGrpMetaCache(*vit) ;
|
||||
mGrpMetaDataCache.clear(grpId) ;
|
||||
}
|
||||
|
||||
ret &= mDb->commitTransaction();
|
||||
|
||||
mGrpMetaDataCache_ContainsAllDatabase = false ;
|
||||
return ret;
|
||||
}
|
||||
|
||||
uint32_t RsDataService::cacheSize() const {
|
||||
return 0;
|
||||
}
|
||||
@ -1724,3 +1721,38 @@ int RsDataService::setCacheSize(uint32_t /* size */)
|
||||
return 0;
|
||||
}
|
||||
|
||||
void RsDataService::debug_printCacheSize() const
|
||||
{
|
||||
uint32_t nb_items,nb_items_on_deadlist;
|
||||
uint64_t total_size,total_size_of_deadlist;
|
||||
|
||||
mGrpMetaDataCache.debug_computeSize(nb_items, nb_items_on_deadlist, total_size,total_size_of_deadlist);
|
||||
|
||||
RsDbg() << "Cache size: " << std::endl;
|
||||
RsDbg() << " Groups: " << " total: " << nb_items << " (dead: " << nb_items_on_deadlist << "), size: " << total_size << " (Dead: " << total_size_of_deadlist << ")" << std::endl;
|
||||
|
||||
nb_items = 0,nb_items_on_deadlist = 0;
|
||||
total_size = 0,total_size_of_deadlist = 0;
|
||||
|
||||
for(auto it:mMsgMetaDataCache)
|
||||
{
|
||||
uint32_t tmp_nb_items,tmp_nb_items_on_deadlist;
|
||||
uint64_t tmp_total_size,tmp_total_size_of_deadlist;
|
||||
|
||||
it.second.debug_computeSize(tmp_nb_items, tmp_nb_items_on_deadlist, tmp_total_size,tmp_total_size_of_deadlist);
|
||||
|
||||
nb_items += tmp_nb_items;
|
||||
nb_items_on_deadlist += tmp_nb_items_on_deadlist;
|
||||
total_size += tmp_total_size;
|
||||
total_size_of_deadlist += tmp_total_size_of_deadlist;
|
||||
}
|
||||
RsDbg() << " Msgs: " << " total: " << nb_items << " (dead: " << nb_items_on_deadlist << "), size: " << total_size << " (Dead: " << total_size_of_deadlist << ")" << std::endl;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
@ -35,6 +35,116 @@ public:
|
||||
ContentValue cv;
|
||||
};
|
||||
|
||||
template<class ID, class MetaDataClass> class t_MetaDataCache
|
||||
{
|
||||
public:
|
||||
t_MetaDataCache() : mCache_ContainsAllMetas(false) {}
|
||||
|
||||
bool isCacheUpToDate() const { return mCache_ContainsAllMetas ; }
|
||||
void setCacheUpToDate(bool b) { mCache_ContainsAllMetas = b; }
|
||||
|
||||
void getFullMetaList(std::map<ID,MetaDataClass*>& mp) const { mp = mMetas ; }
|
||||
void getFullMetaList(std::vector<const MetaDataClass*>& mp) const { for(auto& m:mMetas) mp.push_back(m.second) ; }
|
||||
|
||||
MetaDataClass *getMeta(const ID& id)
|
||||
{
|
||||
auto itt = mMetas.find(id);
|
||||
|
||||
if(itt != mMetas.end())
|
||||
return itt->second ;
|
||||
else
|
||||
return NULL;
|
||||
}
|
||||
|
||||
MetaDataClass *getOrCreateMeta(const ID& id)
|
||||
{
|
||||
MetaDataClass *meta = nullptr;
|
||||
auto it = mMetas.find(id) ;
|
||||
|
||||
if(it != mMetas.end())
|
||||
{
|
||||
#ifdef RS_DATA_SERVICE_DEBUG
|
||||
RsDbg() << __PRETTY_FUNCTION__ << ": getting group meta " << grpId << " from cache." << std::endl;
|
||||
#endif
|
||||
meta = it->second ;
|
||||
}
|
||||
else
|
||||
{
|
||||
#ifdef RS_DATA_SERVICE_DEBUG
|
||||
RsDbg() << __PRETTY_FUNCTION__ << ": group meta " << grpId << " not in cache. Loading it from DB..." << std::endl;
|
||||
#endif
|
||||
meta = new MetaDataClass();
|
||||
mMetas[id] = meta ;
|
||||
}
|
||||
|
||||
return meta;
|
||||
}
|
||||
|
||||
void updateMeta(const ID& id,const MetaDataClass& meta)
|
||||
{
|
||||
auto it = mMetas.find(id) ;
|
||||
|
||||
if(it != mMetas.end())
|
||||
*(it->second) = meta ;
|
||||
else
|
||||
mMetas[id] = new MetaDataClass(meta) ;
|
||||
}
|
||||
|
||||
void clear(const ID& id)
|
||||
{
|
||||
rstime_t now = time(NULL) ;
|
||||
auto it = mMetas.find(id) ;
|
||||
|
||||
// We dont actually delete the item, because it might be used by a calling client.
|
||||
// In this case, the memory will not be used for long, so we keep it into a list for a safe amount
|
||||
// of time and delete it later. Using smart pointers here would be more elegant, but that would need
|
||||
// to be implemented thread safe, which is difficult in this case.
|
||||
|
||||
if(it != mMetas.end())
|
||||
{
|
||||
#ifdef RS_DATA_SERVICE_DEBUG
|
||||
std::cerr << "(II) moving database cache entry " << (void*)(*it).second << " to dead list." << std::endl;
|
||||
#endif
|
||||
|
||||
mOldCachedItems.push_back(std::make_pair(now,it->second)) ;
|
||||
|
||||
mMetas.erase(it) ;
|
||||
mCache_ContainsAllMetas = false;
|
||||
}
|
||||
|
||||
// We also take that opportunity to delete old entries.
|
||||
|
||||
auto it2(mOldCachedItems.begin());
|
||||
|
||||
while(it2!=mOldCachedItems.end() && (*it2).first + CACHE_ENTRY_GRACE_PERIOD < now)
|
||||
{
|
||||
#ifdef RS_DATA_SERVICE_DEBUG
|
||||
std::cerr << "(II) deleting old GXS database cache entry " << (void*)(*it2).second << ", " << now - (*it2).first << " seconds old." << std::endl;
|
||||
#endif
|
||||
delete (*it2).second ;
|
||||
it2 = mOldCachedItems.erase(it2) ;
|
||||
}
|
||||
}
|
||||
|
||||
void debug_computeSize(uint32_t& nb_items, uint32_t& nb_items_on_deadlist, uint64_t& total_size,uint64_t& total_size_of_deadlist) const
|
||||
{
|
||||
nb_items = mMetas.size();
|
||||
nb_items_on_deadlist = mOldCachedItems.size();
|
||||
total_size = 0;
|
||||
total_size_of_deadlist = 0;
|
||||
|
||||
for(auto it:mMetas) total_size += it.second->serial_size();
|
||||
for(auto it:mOldCachedItems) total_size_of_deadlist += it.second->serial_size();
|
||||
}
|
||||
private:
|
||||
std::map<ID,MetaDataClass*> mMetas;
|
||||
std::list<std::pair<rstime_t,MetaDataClass*> > mOldCachedItems ; // dead list, where items get deleted after being unused for a while. This is due to not using smart ptrs.
|
||||
|
||||
static const uint32_t CACHE_ENTRY_GRACE_PERIOD = 600 ; // Unused items are deleted 10 minutes after last usage.
|
||||
|
||||
bool mCache_ContainsAllMetas ;
|
||||
};
|
||||
|
||||
class RsDataService : public RsGeneralDataService
|
||||
{
|
||||
public:
|
||||
@ -147,13 +257,13 @@ public:
|
||||
* @param metaData The meta data item to update
|
||||
* @return error code
|
||||
*/
|
||||
int updateMessageMetaData(MsgLocMetaData& metaData);
|
||||
int updateMessageMetaData(const MsgLocMetaData& metaData);
|
||||
|
||||
/*!
|
||||
* @param metaData The meta data item to update
|
||||
* @return error code
|
||||
*/
|
||||
int updateGroupMetaData(GrpLocMetaData& meta);
|
||||
int updateGroupMetaData(const GrpLocMetaData &meta);
|
||||
|
||||
/*!
|
||||
* Completely clear out data stored in
|
||||
@ -174,6 +284,8 @@ public:
|
||||
|
||||
int updateGroupKeys(const RsGxsGroupId& grpId,const RsTlvSecurityKeySet& keys, uint32_t subscribe_flags) ;
|
||||
|
||||
void debug_printCacheSize() const;
|
||||
|
||||
private:
|
||||
|
||||
/*!
|
||||
@ -194,15 +306,22 @@ private:
|
||||
/*!
|
||||
* Retrieves all the msg meta results from a cursor
|
||||
* @param c cursor to result set
|
||||
* @param metaSet message metadata retrieved from cursor are stored here
|
||||
* @param msgMeta message metadata retrieved from cursor are stored here
|
||||
*/
|
||||
void locked_retrieveMsgMeta(RetroCursor* c, std::vector<RsGxsMsgMetaData*>& msgMeta);
|
||||
void locked_retrieveMsgMetaList(RetroCursor* c, std::vector<const RsGxsMsgMetaData*>& msgMeta);
|
||||
|
||||
/*!
|
||||
* Retrieves all the grp meta results from a cursor
|
||||
* @param c cursor to result set
|
||||
* @param grpMeta group metadata retrieved from cursor are stored here
|
||||
*/
|
||||
void locked_retrieveGrpMetaList(RetroCursor *c, std::map<RsGxsGroupId,RsGxsGrpMetaData *>& grpMeta);
|
||||
|
||||
/*!
|
||||
* extracts a msg meta item from a cursor at its
|
||||
* current position
|
||||
*/
|
||||
RsGxsMsgMetaData* locked_getMsgMeta(RetroCursor& c, int colOffset);
|
||||
RsGxsMsgMetaData* locked_getMsgMeta(RetroCursor& c, int colOffset, bool use_cache);
|
||||
|
||||
/*!
|
||||
* extracts a grp meta item from a cursor at its
|
||||
@ -348,10 +467,8 @@ private:
|
||||
void locked_clearGrpMetaCache(const RsGxsGroupId& gid);
|
||||
void locked_updateGrpMetaCache(const RsGxsGrpMetaData& meta);
|
||||
|
||||
std::map<RsGxsGroupId,RsGxsGrpMetaData*> mGrpMetaDataCache ;
|
||||
std::list<std::pair<rstime_t,RsGxsGrpMetaData*> > mOldCachedItems ;
|
||||
|
||||
bool mGrpMetaDataCache_ContainsAllDatabase ;
|
||||
t_MetaDataCache<RsGxsGroupId,RsGxsGrpMetaData> mGrpMetaDataCache;
|
||||
std::map<RsGxsGroupId,t_MetaDataCache<RsGxsMessageId,RsGxsMsgMetaData> > mMsgMetaDataCache;
|
||||
};
|
||||
|
||||
#endif // RSDATASERVICE_H
|
||||
|
@ -239,12 +239,12 @@ public:
|
||||
/*!
|
||||
* @param metaData
|
||||
*/
|
||||
virtual int updateMessageMetaData(MsgLocMetaData& metaData) = 0;
|
||||
virtual int updateMessageMetaData(const MsgLocMetaData& metaData) = 0;
|
||||
|
||||
/*!
|
||||
* @param metaData
|
||||
*/
|
||||
virtual int updateGroupMetaData(GrpLocMetaData& meta) = 0;
|
||||
virtual int updateGroupMetaData(const GrpLocMetaData& meta) = 0;
|
||||
|
||||
virtual int updateGroupKeys(const RsGxsGroupId& grpId,const RsTlvSecurityKeySet& keys,uint32_t subscribed_flags) = 0 ;
|
||||
|
||||
|
@ -1274,18 +1274,18 @@ bool RsGenExchange::getMsgMeta(const uint32_t &token,
|
||||
|
||||
for(; mit != result.end(); ++mit)
|
||||
{
|
||||
std::vector<RsGxsMsgMetaData*>& metaV = mit->second;
|
||||
std::vector<const RsGxsMsgMetaData*>& metaV = mit->second;
|
||||
|
||||
std::vector<RsMsgMetaData>& msgInfoV = msgInfo[mit->first];
|
||||
|
||||
std::vector<RsGxsMsgMetaData*>::iterator vit = metaV.begin();
|
||||
std::vector<const RsGxsMsgMetaData*>::iterator vit = metaV.begin();
|
||||
RsMsgMetaData meta;
|
||||
for(; vit != metaV.end(); ++vit)
|
||||
{
|
||||
RsGxsMsgMetaData& m = *(*vit);
|
||||
const RsGxsMsgMetaData& m = *(*vit);
|
||||
meta = m;
|
||||
msgInfoV.push_back(meta);
|
||||
delete *vit;
|
||||
//delete *vit;
|
||||
}
|
||||
metaV.clear();
|
||||
}
|
||||
@ -1302,18 +1302,18 @@ bool RsGenExchange::getMsgRelatedMeta(const uint32_t &token, GxsMsgRelatedMetaMa
|
||||
|
||||
for(; mit != result.end(); ++mit)
|
||||
{
|
||||
std::vector<RsGxsMsgMetaData*>& metaV = mit->second;
|
||||
std::vector<const RsGxsMsgMetaData*>& metaV = mit->second;
|
||||
|
||||
std::vector<RsMsgMetaData>& msgInfoV = msgMeta[mit->first];
|
||||
|
||||
std::vector<RsGxsMsgMetaData*>::iterator vit = metaV.begin();
|
||||
std::vector<const RsGxsMsgMetaData*>::iterator vit = metaV.begin();
|
||||
RsMsgMetaData meta;
|
||||
for(; vit != metaV.end(); ++vit)
|
||||
{
|
||||
RsGxsMsgMetaData& m = *(*vit);
|
||||
const RsGxsMsgMetaData& m = *(*vit);
|
||||
meta = m;
|
||||
msgInfoV.push_back(meta);
|
||||
delete *vit;
|
||||
//delete *vit;
|
||||
}
|
||||
metaV.clear();
|
||||
}
|
||||
@ -2016,15 +2016,15 @@ void RsGenExchange::processMsgMetaChanges()
|
||||
|
||||
if(mit != result.end())
|
||||
{
|
||||
std::vector<RsGxsMsgMetaData*>& msgMetaV = mit->second;
|
||||
std::vector<const RsGxsMsgMetaData*>& msgMetaV = mit->second;
|
||||
|
||||
if(!msgMetaV.empty())
|
||||
{
|
||||
RsGxsMsgMetaData* meta = *(msgMetaV.begin());
|
||||
const RsGxsMsgMetaData* meta = *(msgMetaV.begin());
|
||||
value = (meta->mMsgStatus & ~mask) | (mask & value);
|
||||
changed = (static_cast<int64_t>(meta->mMsgStatus) != value);
|
||||
m.val.put(RsGeneralDataService::MSG_META_STATUS, value);
|
||||
delete meta;
|
||||
//delete meta;
|
||||
ok = true;
|
||||
}
|
||||
}
|
||||
|
@ -29,8 +29,8 @@
|
||||
|
||||
/* data types used throughout Gxs from netservice to genexchange */
|
||||
|
||||
typedef std::map<RsGxsGroupId, std::vector<RsGxsMsgMetaData*> > GxsMsgMetaResult;
|
||||
typedef std::map<RsGxsGrpMsgIdPair, std::vector<RsGxsMsgMetaData*> > MsgRelatedMetaResult;
|
||||
typedef std::map<RsGxsGroupId, std::vector<const RsGxsMsgMetaData*> > GxsMsgMetaResult;
|
||||
typedef std::map<RsGxsGrpMsgIdPair, std::vector<const RsGxsMsgMetaData*> > MsgRelatedMetaResult;
|
||||
|
||||
// Default values that are used throughout GXS code
|
||||
|
||||
|
@ -209,7 +209,7 @@ RsGxsMsgMetaData::~RsGxsMsgMetaData(){
|
||||
return;
|
||||
}
|
||||
|
||||
uint32_t RsGxsMsgMetaData::serial_size()
|
||||
uint32_t RsGxsMsgMetaData::serial_size() const
|
||||
{
|
||||
|
||||
uint32_t s = 8; // header size
|
||||
|
@ -48,6 +48,7 @@ public:
|
||||
bool deserialise(void *data, uint32_t &pktsize);
|
||||
bool serialise(void* data, uint32_t &pktsize, uint32_t api_version);
|
||||
uint32_t serial_size(uint32_t api_version) const;
|
||||
uint32_t serial_size() const { return serial_size(RS_GXS_GRP_META_DATA_CURRENT_API_VERSION); }
|
||||
void clear();
|
||||
void operator =(const RsGroupMetaData& rMeta);
|
||||
|
||||
@ -94,7 +95,7 @@ public:
|
||||
~RsGxsMsgMetaData();
|
||||
bool deserialise(void *data, uint32_t *size);
|
||||
bool serialise(void* data, uint32_t *size);
|
||||
uint32_t serial_size();
|
||||
uint32_t serial_size() const;
|
||||
void clear();
|
||||
void operator =(const RsMsgMetaData& rMeta);
|
||||
|
||||
|
@ -1028,7 +1028,7 @@ bool RsGxsDataAccess::getMsgMetaDataList( const GxsMsgReq& msgIds, const RsTokRe
|
||||
|
||||
//auto& filter( metaFilter[grpId] ); // does the initialization of metaFilter[grpId] and avoids further O(log(n)) calls
|
||||
|
||||
std::vector<RsGxsMsgMetaData*>& metaV = meta_it->second;
|
||||
std::vector<const RsGxsMsgMetaData*>& metaV = meta_it->second;
|
||||
|
||||
if (onlyLatestMsgs) // if we only consider latest messages, we need to first filter out messages with "children"
|
||||
{
|
||||
@ -1062,7 +1062,7 @@ bool RsGxsDataAccess::getMsgMetaDataList( const GxsMsgReq& msgIds, const RsTokRe
|
||||
for(uint32_t i=0;i<metaV.size();++i)
|
||||
if(!keep[i])
|
||||
{
|
||||
delete metaV[i];
|
||||
//delete metaV[i];
|
||||
metaV[i] = nullptr;
|
||||
}
|
||||
}
|
||||
@ -1122,20 +1122,20 @@ bool RsGxsDataAccess::getMsgMetaDataList( const GxsMsgReq& msgIds, const RsTokRe
|
||||
for(uint32_t i=0;i<metaV.size();++i)
|
||||
if(metaV[i] != nullptr)
|
||||
{
|
||||
RsGxsMsgMetaData* msgMeta = metaV[i];
|
||||
const RsGxsMsgMetaData* msgMeta = metaV[i];
|
||||
bool add = false;
|
||||
|
||||
/* if we are grabbing thread Head... then parentId == empty. */
|
||||
if (onlyThreadHeadMsgs && !msgMeta->mParentId.isNull())
|
||||
{
|
||||
delete msgMeta;
|
||||
//delete msgMeta;
|
||||
metaV[i] = nullptr;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (onlyOrigMsgs && !msgMeta->mOrigMsgId.isNull() && msgMeta->mMsgId != msgMeta->mOrigMsgId)
|
||||
{
|
||||
delete msgMeta;
|
||||
//delete msgMeta;
|
||||
metaV[i] = nullptr;
|
||||
continue;
|
||||
}
|
||||
@ -1187,7 +1187,7 @@ bool RsGxsDataAccess::getMsgIdList( const GxsMsgReq& msgIds, const RsTokReqOptio
|
||||
}
|
||||
|
||||
// delete meta data
|
||||
cleanseMsgMetaMap(result);
|
||||
//cleanseMsgMetaMap(result);
|
||||
|
||||
return true;
|
||||
}
|
||||
@ -1296,9 +1296,7 @@ bool RsGxsDataAccess::getMsgRelatedInfo(MsgRelatedInfoReq *req)
|
||||
return true;
|
||||
}
|
||||
|
||||
std::vector<RsGxsGrpMsgIdPair>::iterator vit_msgIds = req->mMsgIds.begin();
|
||||
|
||||
for(; vit_msgIds != req->mMsgIds.end(); ++vit_msgIds)
|
||||
for(auto vit_msgIds(req->mMsgIds.begin()); vit_msgIds != req->mMsgIds.end(); ++vit_msgIds)
|
||||
{
|
||||
MsgMetaFilter filterMap;
|
||||
|
||||
@ -1310,8 +1308,8 @@ bool RsGxsDataAccess::getMsgRelatedInfo(MsgRelatedInfoReq *req)
|
||||
GxsMsgReq msgIds;
|
||||
msgIds.insert(std::make_pair(grpMsgIdPair.first, std::set<RsGxsMessageId>()));
|
||||
mDataStore->retrieveGxsMsgMetaData(msgIds, result);
|
||||
std::vector<RsGxsMsgMetaData*>& metaV = result[grpMsgIdPair.first];
|
||||
std::vector<RsGxsMsgMetaData*>::iterator vit_meta;
|
||||
std::vector<const RsGxsMsgMetaData*>& metaV = result[grpMsgIdPair.first];
|
||||
std::vector<const RsGxsMsgMetaData*>::iterator vit_meta;
|
||||
|
||||
// msg id to relate to
|
||||
const RsGxsMessageId& msgId = grpMsgIdPair.second;
|
||||
@ -1319,10 +1317,11 @@ bool RsGxsDataAccess::getMsgRelatedInfo(MsgRelatedInfoReq *req)
|
||||
|
||||
std::set<RsGxsMessageId> outMsgIds;
|
||||
|
||||
RsGxsMsgMetaData* origMeta = nullptr;
|
||||
const RsGxsMsgMetaData* origMeta = nullptr;
|
||||
|
||||
for(vit_meta = metaV.begin(); vit_meta != metaV.end(); ++vit_meta)
|
||||
{
|
||||
RsGxsMsgMetaData* meta = *vit_meta;
|
||||
const RsGxsMsgMetaData* meta = *vit_meta;
|
||||
|
||||
if(msgId == meta->mMsgId)
|
||||
{
|
||||
@ -1337,12 +1336,11 @@ bool RsGxsDataAccess::getMsgRelatedInfo(MsgRelatedInfoReq *req)
|
||||
RsDbg() << "RsGxsDataAccess::getMsgRelatedInfo(): Cannot find meta of msgId (to relate to)!"
|
||||
<< std::endl;
|
||||
#endif
|
||||
cleanseMsgMetaMap(result);
|
||||
return false;
|
||||
}
|
||||
|
||||
const RsGxsMessageId& origMsgId = origMeta->mOrigMsgId;
|
||||
std::map<RsGxsMessageId, RsGxsMsgMetaData*>& metaMap = filterMap[grpId];
|
||||
std::map<RsGxsMessageId, const RsGxsMsgMetaData*>& metaMap = filterMap[grpId];
|
||||
|
||||
if (onlyLatestMsgs)
|
||||
{
|
||||
@ -1354,7 +1352,7 @@ bool RsGxsDataAccess::getMsgRelatedInfo(MsgRelatedInfoReq *req)
|
||||
for(vit_meta = metaV.begin(); vit_meta != metaV.end(); ++vit_meta)
|
||||
{
|
||||
|
||||
RsGxsMsgMetaData* meta = *vit_meta;
|
||||
const RsGxsMsgMetaData* meta = *vit_meta;
|
||||
|
||||
// skip msgs that aren't children.
|
||||
if (onlyChildMsgs)
|
||||
@ -1422,11 +1420,11 @@ bool RsGxsDataAccess::getMsgRelatedInfo(MsgRelatedInfoReq *req)
|
||||
/* first guess is potentially better than Orig (can't be worse!) */
|
||||
rstime_t latestTs = 0;
|
||||
RsGxsMessageId latestMsgId;
|
||||
RsGxsMsgMetaData* latestMeta=nullptr;
|
||||
const RsGxsMsgMetaData* latestMeta=nullptr;
|
||||
|
||||
for(vit_meta = metaV.begin(); vit_meta != metaV.end(); ++vit_meta)
|
||||
{
|
||||
RsGxsMsgMetaData* meta = *vit_meta;
|
||||
const RsGxsMsgMetaData* meta = *vit_meta;
|
||||
|
||||
if (meta->mOrigMsgId == origMsgId)
|
||||
{
|
||||
@ -1446,7 +1444,7 @@ bool RsGxsDataAccess::getMsgRelatedInfo(MsgRelatedInfoReq *req)
|
||||
{
|
||||
for(vit_meta = metaV.begin(); vit_meta != metaV.end(); ++vit_meta)
|
||||
{
|
||||
RsGxsMsgMetaData* meta = *vit_meta;
|
||||
const RsGxsMsgMetaData* meta = *vit_meta;
|
||||
|
||||
if (meta->mOrigMsgId == origMsgId)
|
||||
{
|
||||
@ -1482,8 +1480,6 @@ bool RsGxsDataAccess::getMsgRelatedInfo(MsgRelatedInfoReq *req)
|
||||
|
||||
outMsgIds.clear();
|
||||
filteredOutMsgIds.clear();
|
||||
|
||||
cleanseMsgMetaMap(result);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
@ -1496,7 +1492,7 @@ bool RsGxsDataAccess::getGroupStatistic(GroupStatisticRequest *req)
|
||||
GxsMsgMetaResult metaResult;
|
||||
mDataStore->retrieveGxsMsgMetaData(metaReq, metaResult);
|
||||
|
||||
const std::vector<RsGxsMsgMetaData*>& msgMetaV = metaResult[req->mGrpId];
|
||||
const std::vector<const RsGxsMsgMetaData*>& msgMetaV = metaResult[req->mGrpId];
|
||||
|
||||
req->mGroupStatistic.mGrpId = req->mGrpId;
|
||||
req->mGroupStatistic.mNumMsgs = msgMetaV.size();
|
||||
@ -1514,7 +1510,7 @@ bool RsGxsDataAccess::getGroupStatistic(GroupStatisticRequest *req)
|
||||
|
||||
for(uint32_t i = 0; i < msgMetaV.size(); ++i)
|
||||
{
|
||||
RsGxsMsgMetaData* m = msgMetaV[i];
|
||||
const RsGxsMsgMetaData* m = msgMetaV[i];
|
||||
req->mGroupStatistic.mTotalSizeOfMsgs += m->mMsgSize + m->serial_size();
|
||||
|
||||
if(obsolete_msgs.find(m->mMsgId) != obsolete_msgs.end()) // skip obsolete messages.
|
||||
@ -1540,7 +1536,7 @@ bool RsGxsDataAccess::getGroupStatistic(GroupStatisticRequest *req)
|
||||
}
|
||||
}
|
||||
|
||||
cleanseMsgMetaMap(metaResult);
|
||||
//cleanseMsgMetaMap(metaResult);
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -1595,21 +1591,19 @@ bool RsGxsDataAccess::getMsgIdList(MsgIdReq* req)
|
||||
|
||||
mDataStore->retrieveGxsMsgMetaData(req->mMsgIds, result);
|
||||
|
||||
|
||||
GxsMsgMetaResult::iterator mit = result.begin(), mit_end = result.end();
|
||||
|
||||
for(; mit != mit_end; ++mit)
|
||||
{
|
||||
const RsGxsGroupId grpId = mit->first;
|
||||
std::vector<RsGxsMsgMetaData*>& metaV = mit->second;
|
||||
std::vector<RsGxsMsgMetaData*>::iterator vit = metaV.begin(),
|
||||
std::vector<const RsGxsMsgMetaData*>& metaV = mit->second;
|
||||
std::vector<const RsGxsMsgMetaData*>::iterator vit = metaV.begin(),
|
||||
vit_end = metaV.end();
|
||||
|
||||
for(; vit != vit_end; ++vit)
|
||||
{
|
||||
RsGxsMsgMetaData* meta = *vit;
|
||||
const RsGxsMsgMetaData* meta = *vit;
|
||||
req->mMsgIdResult[grpId].insert(meta->mMsgId);
|
||||
delete meta; // discard meta data mem
|
||||
}
|
||||
}
|
||||
|
||||
@ -1622,24 +1616,24 @@ bool RsGxsDataAccess::getMsgIdList(MsgIdReq* req)
|
||||
return true;
|
||||
}
|
||||
|
||||
void RsGxsDataAccess::cleanseMsgMetaMap(GxsMsgMetaResult& result)
|
||||
{
|
||||
GxsMsgMetaResult::iterator mit = result.begin();
|
||||
|
||||
for(; mit !=result.end(); ++mit)
|
||||
{
|
||||
|
||||
std::vector<RsGxsMsgMetaData*>& msgMetaV = mit->second;
|
||||
std::vector<RsGxsMsgMetaData*>::iterator vit = msgMetaV.begin();
|
||||
for(; vit != msgMetaV.end(); ++vit)
|
||||
{
|
||||
delete *vit;
|
||||
}
|
||||
}
|
||||
|
||||
result.clear();
|
||||
return;
|
||||
}
|
||||
// void RsGxsDataAccess::cleanseMsgMetaMap(GxsMsgMetaResult& result)
|
||||
// {
|
||||
// GxsMsgMetaResult::iterator mit = result.begin();
|
||||
//
|
||||
// for(; mit !=result.end(); ++mit)
|
||||
// {
|
||||
//
|
||||
// std::vector<RsGxsMsgMetaData*>& msgMetaV = mit->second;
|
||||
// std::vector<RsGxsMsgMetaData*>::iterator vit = msgMetaV.begin();
|
||||
// for(; vit != msgMetaV.end(); ++vit)
|
||||
// {
|
||||
// delete *vit;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// result.clear();
|
||||
// return;
|
||||
// }
|
||||
|
||||
void RsGxsDataAccess::filterMsgIdList( GxsMsgIdResult& resultsMap, const RsTokReqOptions& opts, const MsgMetaFilter& msgMetas ) const
|
||||
{
|
||||
@ -1659,11 +1653,11 @@ void RsGxsDataAccess::filterMsgIdList( GxsMsgIdResult& resultsMap, const RsTokRe
|
||||
for( std::set<RsGxsMessageId>::iterator msgIdIt = msgsIdSet.begin(); msgIdIt != msgsIdSet.end(); )
|
||||
{
|
||||
const RsGxsMessageId& msgId(*msgIdIt);
|
||||
const std::map<RsGxsMessageId, RsGxsMsgMetaData*>& msgsMetaMap =
|
||||
const std::map<RsGxsMessageId, const RsGxsMsgMetaData*>& msgsMetaMap =
|
||||
cit->second;
|
||||
|
||||
bool keep = false;
|
||||
std::map<RsGxsMessageId, RsGxsMsgMetaData*>::const_iterator msgsMetaMapIt;
|
||||
std::map<RsGxsMessageId, const RsGxsMsgMetaData*>::const_iterator msgsMetaMapIt;
|
||||
|
||||
if( (msgsMetaMapIt = msgsMetaMap.find(msgId)) != msgsMetaMap.end() )
|
||||
{
|
||||
|
@ -28,7 +28,7 @@
|
||||
#include "rsgds.h"
|
||||
|
||||
|
||||
typedef std::map< RsGxsGroupId, std::map<RsGxsMessageId, RsGxsMsgMetaData*> > MsgMetaFilter;
|
||||
typedef std::map< RsGxsGroupId, std::map<RsGxsMessageId, const RsGxsMsgMetaData*> > MsgMetaFilter;
|
||||
typedef std::map< RsGxsGroupId, RsGxsGrpMetaData* > GrpMetaFilter;
|
||||
|
||||
bool operator<(const std::pair<uint32_t,GxsRequest*>& p1,const std::pair<uint32_t,GxsRequest*>& p2);
|
||||
@ -328,11 +328,11 @@ private:
|
||||
*/
|
||||
void tokenList(std::list<uint32_t> &tokens);
|
||||
|
||||
/*!
|
||||
* Convenience function to delete the ids
|
||||
* @param filter the meta filter to clean
|
||||
*/
|
||||
void cleanseMsgMetaMap(GxsMsgMetaResult& result);
|
||||
// /*!
|
||||
// * Convenience function to delete the ids
|
||||
// * @param filter the meta filter to clean
|
||||
// */
|
||||
// void cleanseMsgMetaMap(GxsMsgMetaResult& result);
|
||||
|
||||
public:
|
||||
|
||||
|
@ -953,7 +953,7 @@ void RsGxsNetService::handleRecvSyncGrpStatistics(RsNxsSyncGrpStatsItem *grs)
|
||||
#endif
|
||||
mDataStore->retrieveGxsMsgMetaData(reqIds, result);
|
||||
|
||||
const std::vector<RsGxsMsgMetaData*>& vec(result[grs->grpId]) ;
|
||||
const std::vector<const RsGxsMsgMetaData*>& vec(result[grs->grpId]) ;
|
||||
|
||||
if(vec.empty()) // that means we don't have any, or there isn't any, but since the default is always 0, no need to send.
|
||||
return ;
|
||||
@ -970,12 +970,9 @@ void RsGxsNetService::handleRecvSyncGrpStatistics(RsNxsSyncGrpStatsItem *grs)
|
||||
// be used to discard groups that are not used.
|
||||
|
||||
for(uint32_t i=0;i<vec.size();++i)
|
||||
{
|
||||
if(grs_resp->last_post_TS < vec[i]->mPublishTs)
|
||||
grs_resp->last_post_TS = vec[i]->mPublishTs;
|
||||
|
||||
delete vec[i] ;
|
||||
}
|
||||
#ifdef NXS_NET_DEBUG_6
|
||||
GXSNETDEBUG_PG(grs->PeerId(),grs->grpId) << " sending back statistics item with " << vec.size() << " elements." << std::endl;
|
||||
#endif
|
||||
@ -2953,21 +2950,19 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr)
|
||||
reqIds[grpId] = std::set<RsGxsMessageId>();
|
||||
GxsMsgMetaResult result;
|
||||
mDataStore->retrieveGxsMsgMetaData(reqIds, result);
|
||||
std::vector<RsGxsMsgMetaData*> &msgMetaV = result[grpId];
|
||||
std::vector<const RsGxsMsgMetaData*> &msgMetaV = result[grpId];
|
||||
|
||||
#ifdef NXS_NET_DEBUG_1
|
||||
GXSNETDEBUG_PG(item->PeerId(),grpId) << " retrieving grp message list..." << std::endl;
|
||||
GXSNETDEBUG_PG(item->PeerId(),grpId) << " grp locally contains " << msgMetaV.size() << " messsages." << std::endl;
|
||||
#endif
|
||||
std::vector<RsGxsMsgMetaData*>::const_iterator vit = msgMetaV.begin();
|
||||
std::vector<const RsGxsMsgMetaData*>::const_iterator vit = msgMetaV.begin();
|
||||
std::set<RsGxsMessageId> msgIdSet;
|
||||
|
||||
// put ids in set for each searching
|
||||
for(; vit != msgMetaV.end(); ++vit)
|
||||
{
|
||||
msgIdSet.insert((*vit)->mMsgId);
|
||||
delete(*vit);
|
||||
}
|
||||
|
||||
msgMetaV.clear();
|
||||
|
||||
#ifdef NXS_NET_DEBUG_1
|
||||
@ -4367,7 +4362,7 @@ void RsGxsNetService::handleRecvSyncMessage(RsNxsSyncMsgReqItem *item,bool item_
|
||||
|
||||
GxsMsgMetaResult metaResult;
|
||||
mDataStore->retrieveGxsMsgMetaData(req, metaResult);
|
||||
std::vector<RsGxsMsgMetaData*>& msgMetas = metaResult[item->grpId];
|
||||
std::vector<const RsGxsMsgMetaData*>& msgMetas = metaResult[item->grpId];
|
||||
|
||||
#ifdef NXS_NET_DEBUG_0
|
||||
GXSNETDEBUG_PG(item->PeerId(),item->grpId) << " retrieving message meta data." << std::endl;
|
||||
@ -4395,9 +4390,9 @@ void RsGxsNetService::handleRecvSyncMessage(RsNxsSyncMsgReqItem *item,bool item_
|
||||
|
||||
if(canSendMsgIds(msgMetas, *grpMeta, peer, should_encrypt_to_this_circle_id))
|
||||
{
|
||||
for(std::vector<RsGxsMsgMetaData*>::iterator vit = msgMetas.begin();vit != msgMetas.end(); ++vit)
|
||||
for(auto vit = msgMetas.begin();vit != msgMetas.end(); ++vit)
|
||||
{
|
||||
RsGxsMsgMetaData* m = *vit;
|
||||
const RsGxsMsgMetaData* m = *vit;
|
||||
|
||||
// Check reputation
|
||||
|
||||
@ -4497,8 +4492,8 @@ void RsGxsNetService::handleRecvSyncMessage(RsNxsSyncMsgReqItem *item,bool item_
|
||||
|
||||
|
||||
// release meta resource
|
||||
for(std::vector<RsGxsMsgMetaData*>::iterator vit = msgMetas.begin(); vit != msgMetas.end(); ++vit)
|
||||
delete *vit;
|
||||
// for(std::vector<RsGxsMsgMetaData*>::iterator vit = msgMetas.begin(); vit != msgMetas.end(); ++vit)
|
||||
// delete *vit;
|
||||
}
|
||||
|
||||
void RsGxsNetService::locked_pushMsgRespFromList(std::list<RsNxsItem*>& itemL, const RsPeerId& sslId, const RsGxsGroupId& grp_id,const uint32_t& transN)
|
||||
@ -4542,7 +4537,7 @@ void RsGxsNetService::locked_pushMsgRespFromList(std::list<RsNxsItem*>& itemL, c
|
||||
}
|
||||
}
|
||||
|
||||
bool RsGxsNetService::canSendMsgIds(std::vector<RsGxsMsgMetaData*>& msgMetas, const RsGxsGrpMetaData& grpMeta, const RsPeerId& sslId,RsGxsCircleId& should_encrypt_id)
|
||||
bool RsGxsNetService::canSendMsgIds(std::vector<const RsGxsMsgMetaData*>& msgMetas, const RsGxsGrpMetaData& grpMeta, const RsPeerId& sslId,RsGxsCircleId& should_encrypt_id)
|
||||
{
|
||||
#ifdef NXS_NET_DEBUG_4
|
||||
GXSNETDEBUG_PG(sslId,grpMeta.mGroupId) << "RsGxsNetService::canSendMsgIds() CIRCLE VETTING" << std::endl;
|
||||
@ -4609,7 +4604,7 @@ bool RsGxsNetService::canSendMsgIds(std::vector<RsGxsMsgMetaData*>& msgMetas, co
|
||||
GXSNETDEBUG_PG(sslId,grpMeta.mGroupId) << " deleting MsgMeta entry for msg ID " << msgMetas[i]->mMsgId << " signed by " << msgMetas[i]->mAuthorId << " who is not in group circle " << circleId << std::endl;
|
||||
#endif
|
||||
|
||||
delete msgMetas[i] ;
|
||||
//delete msgMetas[i] ;
|
||||
msgMetas[i] = msgMetas[msgMetas.size()-1] ;
|
||||
msgMetas.pop_back() ;
|
||||
}
|
||||
|
@ -395,7 +395,7 @@ private:
|
||||
* @return false, if you cannot send to this peer, true otherwise
|
||||
*/
|
||||
bool canSendGrpId(const RsPeerId& sslId, const RsGxsGrpMetaData& grpMeta, std::vector<GrpIdCircleVet>& toVet, bool &should_encrypt);
|
||||
bool canSendMsgIds(std::vector<RsGxsMsgMetaData*>& msgMetas, const RsGxsGrpMetaData&, const RsPeerId& sslId, RsGxsCircleId &should_encrypt_id);
|
||||
bool canSendMsgIds(std::vector<const RsGxsMsgMetaData*>& msgMetas, const RsGxsGrpMetaData&, const RsPeerId& sslId, RsGxsCircleId &should_encrypt_id);
|
||||
|
||||
/*!
|
||||
* \brief checkPermissionsForFriendGroup
|
||||
|
@ -85,7 +85,7 @@ bool RsGxsMessageCleanUp::clean()
|
||||
|
||||
for(; mit != result.end(); ++mit)
|
||||
{
|
||||
std::vector<RsGxsMsgMetaData*>& metaV = mit->second;
|
||||
std::vector<const RsGxsMsgMetaData*>& metaV = mit->second;
|
||||
|
||||
// First, make a map of which message have a child message. This allows to only delete messages that dont have child messages.
|
||||
// A more accurate way to go would be to compute the time of the oldest message and possibly delete all the branch, but in the
|
||||
@ -99,7 +99,7 @@ bool RsGxsMessageCleanUp::clean()
|
||||
|
||||
for( uint32_t i=0;i<metaV.size();++i)
|
||||
{
|
||||
RsGxsMsgMetaData* meta = metaV[i];
|
||||
const RsGxsMsgMetaData* meta = metaV[i];
|
||||
|
||||
bool have_kids = (messages_with_kids.find(meta->mMsgId)!=messages_with_kids.end());
|
||||
|
||||
@ -132,7 +132,7 @@ bool RsGxsMessageCleanUp::clean()
|
||||
std::cerr << std::endl;
|
||||
#endif
|
||||
|
||||
delete meta;
|
||||
//delete meta;
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user