added history cache optimisation to subscribed/client groups.

added couple more ifdefines to remove cache opt code. 


git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@4192 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
chrisparker126 2011-05-08 18:41:59 +00:00
parent 6f7dadfdc5
commit cf585b3ce0
2 changed files with 87 additions and 25 deletions

View File

@ -242,14 +242,14 @@ int p3GroupDistrib::loadCache(const CacheData &data)
std::cerr << std::endl; std::cerr << std::endl;
#endif #endif
/* store the cache file for later processing */ /* store the cache file for later processing */
mPendingHistCaches.push_back(CacheDataPending(data, false, mHistoricalCaches)); mPendingHistCaches.push_back(CacheDataPending(data, false, true));
}else }else
{ {
#ifdef DISTRIB_THREAD_DEBUG #ifdef DISTRIB_THREAD_DEBUG
std::cerr << "p3GroupDistrib::loadCache() Storing non historical PendingRemoteCache"; std::cerr << "p3GroupDistrib::loadCache() Storing non historical PendingRemoteCache";
std::cerr << std::endl; std::cerr << std::endl;
#endif #endif
mPendingCaches.push_back(CacheDataPending(data, false, mHistoricalCaches)); mPendingCaches.push_back(CacheDataPending(data, false, false));
} }
if (data.size > 0) if (data.size > 0)
@ -269,6 +269,13 @@ bool p3GroupDistrib::loadLocalCache(const CacheData &data)
std::cerr << std::endl; std::cerr << std::endl;
#endif #endif
if(mHistoricalCaches)
{
RsStackMutex stack(distribMtx);
mPendingHistCaches.push_back(CacheDataPending(data, true, true));
}
else
{ {
RsStackMutex stack(distribMtx); RsStackMutex stack(distribMtx);
@ -278,7 +285,7 @@ bool p3GroupDistrib::loadLocalCache(const CacheData &data)
#endif #endif
/* store the cache file for later processing */ /* store the cache file for later processing */
mPendingCaches.push_back(CacheDataPending(data, true, mHistoricalCaches)); mPendingCaches.push_back(CacheDataPending(data, true, false));
} }
if (data.size > 0) if (data.size > 0)
@ -482,7 +489,8 @@ void p3GroupDistrib::updateCacheDocument()
return; return;
} }
void p3GroupDistrib::locked_updateCacheTableGrp(const std::vector<grpNodePair>& grpNodes, bool historical){ void p3GroupDistrib::locked_updateCacheTableGrp(const std::vector<grpNodePair>& grpNodes, bool historical)
{
#ifdef DISTRIB_HISTORY_DEBUG #ifdef DISTRIB_HISTORY_DEBUG
std::cerr << "p3GroupDistrib::locked_updateCacheTableGrp " std::cerr << "p3GroupDistrib::locked_updateCacheTableGrp "
@ -502,7 +510,8 @@ void p3GroupDistrib::locked_updateCacheTableGrp(const std::vector<grpNodePair>&
return; return;
} }
void p3GroupDistrib::locked_updateCacheTableMsg(const std::map<std::string, std::set<pCacheId> >& msgCacheMap){ void p3GroupDistrib::locked_updateCacheTableMsg(const std::map<std::string, std::set<pCacheId> >& msgCacheMap)
{
#ifdef DISTRIB_HISTORY_DEBUG #ifdef DISTRIB_HISTORY_DEBUG
std::cerr << "p3GroupDistrib::locked_updateCacheTableMsg() " std::cerr << "p3GroupDistrib::locked_updateCacheTableMsg() "
@ -652,6 +661,10 @@ bool p3GroupDistrib::locked_buildCacheTable(){
void p3GroupDistrib::locked_processHistoryCached(const std::string& grpId) void p3GroupDistrib::locked_processHistoryCached(const std::string& grpId)
{ {
// no processing should be done until cache locations have been stored in memory
if(mHistoricalCaches)
return;
#ifdef DISTRIB_HISTORY_DEBUG #ifdef DISTRIB_HISTORY_DEBUG
std::cerr << "p3GroupDistrib::locked_processHistoryCached() " std::cerr << "p3GroupDistrib::locked_processHistoryCached() "
<< std::endl; << std::endl;
@ -729,6 +742,10 @@ void p3GroupDistrib::locked_getHistoryCacheData(const std::string& grpId, std::l
std::set<pCacheId>::iterator pit, pitEnd; std::set<pCacheId>::iterator pit, pitEnd;
CacheData cDataTemp; CacheData cDataTemp;
GroupInfo* grpInfo = locked_getGroupInfo(grpId);
std::map<CacheId, CacheData>::iterator mit;
bool subscribed = grpInfo->flags & RS_DISTRIB_SUBSCRIBED;
nit = mCacheTable.find(grpId); nit = mCacheTable.find(grpId);
if(nit != mCacheTable.end()){ if(nit != mCacheTable.end()){
@ -740,7 +757,14 @@ void p3GroupDistrib::locked_getHistoryCacheData(const std::string& grpId, std::l
cDataTemp.cid.subid = pit->second; cDataTemp.cid.subid = pit->second;
cDataTemp.pid = pit->first; cDataTemp.pid = pit->first;
locked_getStoredCache(cDataTemp);
if(subscribed){
mit = mLocalHistCachesAvail.find(CacheId(CacheSource::getCacheType(), cDataTemp.cid.subid));
if(mit != mLocalHistCachesAvail.end()) cDataTemp = mit->second;
}
else
locked_getStoredCache(cDataTemp);
cDataSet.push_back(cDataTemp); cDataSet.push_back(cDataTemp);
} }
} }
@ -764,8 +788,8 @@ bool p3GroupDistrib::locked_loadHistoryCacheFile()
std::string hFileName = mKeyBackUpDir + "/" + HIST_CACHE_FNAME; std::string hFileName = mKeyBackUpDir + "/" + HIST_CACHE_FNAME;
std::ifstream hFile(hFileName.c_str()); std::ifstream hFile(hFileName.c_str());
int fileLength; int fileLength;
char* fileLoadBuffer; char* fileLoadBuffer = NULL;
char* decryptedCacheFile; char* decryptedCacheFile = NULL;
int outlen = 0; int outlen = 0;
bool ok = false; bool ok = false;
hFile.seekg(0, std::ios::end); hFile.seekg(0, std::ios::end);
@ -782,15 +806,22 @@ bool p3GroupDistrib::locked_loadHistoryCacheFile()
ok = AuthSSL::getAuthSSL()->decrypt((void*&)decryptedCacheFile, outlen, ok = AuthSSL::getAuthSSL()->decrypt((void*&)decryptedCacheFile, outlen,
fileLoadBuffer, fileLength); fileLoadBuffer, fileLength);
if(fileLoadBuffer != NULL)
delete[] fileLoadBuffer;
char* buffer = static_cast<char*>(pugi::get_memory_allocation_function()(outlen)); char* buffer = static_cast<char*>(pugi::get_memory_allocation_function()(outlen));
if(ok){ if(ok){
memcpy(buffer, decryptedCacheFile, outlen); memcpy(buffer, decryptedCacheFile, outlen);
ok &= mCacheDoc.load_buffer_inplace_own(buffer, outlen);
delete[] decryptedCacheFile;
}
delete[] fileLoadBuffer; if(decryptedCacheFile != NULL)
delete[] decryptedCacheFile;
ok &= mCacheDoc.load_buffer_inplace_own(buffer, outlen);
}else{
if(buffer !=NULL)
delete[] buffer;
}
return ok; return ok;
} }
@ -817,16 +848,18 @@ bool p3GroupDistrib::locked_saveHistoryCacheFile()
fileBuffer = new char[cacheContent.size()]; fileBuffer = new char[cacheContent.size()];
cacheContent.copy(fileBuffer, cacheContent.size(), 0); cacheContent.copy(fileBuffer, cacheContent.size(), 0);
ok = AuthSSL::getAuthSSL()->encrypt((void*&)encryptedFileBuffer, outlen, ok = AuthSSL::getAuthSSL()->encrypt((void*&)encryptedFileBuffer, outlen,
(void*&)fileBuffer, streamLength, mOwnId); (void*&)fileBuffer, streamLength, mOwnId);
hFile.write(encryptedFileBuffer, outlen); if(ok){
hFile.close(); hFile.write(encryptedFileBuffer, outlen);
hFile.close();
}
if(fileBuffer) if(fileBuffer != NULL)
delete[] fileBuffer; delete[] fileBuffer;
if(encryptedFileBuffer) if(encryptedFileBuffer != NULL)
delete[] encryptedFileBuffer; delete[] encryptedFileBuffer;
return ok; return ok;
@ -842,6 +875,8 @@ CacheDataPending::CacheDataPending(const CacheData &data, bool local, bool histo
void p3GroupDistrib::HistoricalCachesDone() void p3GroupDistrib::HistoricalCachesDone()
{ {
RsStackMutex stack(distribMtx); RsStackMutex stack(distribMtx);
std::string id;
cachesAvailable(id, mLocalHistCachesAvail);
mHistoricalCaches = false; // called when Stored Caches have been added to Pending List. mHistoricalCaches = false; // called when Stored Caches have been added to Pending List.
} }
@ -2286,7 +2321,9 @@ bool p3GroupDistrib::getAllMsgList(std::string grpId, std::list<std::string> &ms
RsStackMutex stack(distribMtx); /************* STACK MUTEX ************/ RsStackMutex stack(distribMtx); /************* STACK MUTEX ************/
#ifdef ENABLE_CACHE_OPT
locked_processHistoryCached(grpId); locked_processHistoryCached(grpId);
#endif
std::map<std::string, GroupInfo>::iterator git; std::map<std::string, GroupInfo>::iterator git;
if (mGroups.end() == (git = mGroups.find(grpId))) if (mGroups.end() == (git = mGroups.find(grpId)))
@ -2309,7 +2346,9 @@ bool p3GroupDistrib::getParentMsgList(std::string grpId, std::string pId,
{ {
RsStackMutex stack(distribMtx); /************* STACK MUTEX ************/ RsStackMutex stack(distribMtx); /************* STACK MUTEX ************/
#ifdef ENABLE_CACHE_OPT
locked_processHistoryCached(grpId); locked_processHistoryCached(grpId);
#endif
std::map<std::string, GroupInfo>::iterator git; std::map<std::string, GroupInfo>::iterator git;
if (mGroups.end() == (git = mGroups.find(grpId))) if (mGroups.end() == (git = mGroups.find(grpId)))
@ -2707,10 +2746,21 @@ bool p3GroupDistrib::loadList(std::list<RsItem *>& load)
{ {
std::list<RsItem *>::iterator lit; std::list<RsItem *>::iterator lit;
#ifdef ENABLE_CACHE_OPT
{
RsStackMutex stack(distribMtx);
if(locked_loadHistoryCacheFile())
locked_buildCacheTable();
}
#endif
/* for child config data */ /* for child config data */
std::list<RsItem* > childLoadL; std::list<RsItem* > childLoadL;
RsSerialType* childSer = createSerialiser(); RsSerialType* childSer = createSerialiser();
grpCachePair gcPair;
pCacheId cId;
bool cached = false;
for(lit = load.begin(); lit != load.end(); lit++) for(lit = load.begin(); lit != load.end(); lit++)
{ {
/* decide what type it is */ /* decide what type it is */
@ -2724,8 +2774,19 @@ bool p3GroupDistrib::loadList(std::list<RsItem *>& load)
if ((newGrp = dynamic_cast<RsDistribGrp *>(*lit))) if ((newGrp = dynamic_cast<RsDistribGrp *>(*lit)))
{ {
const std::string &gid = newGrp -> grpId; const std::string &gid = newGrp -> grpId;
loadGroup(newGrp, false); if(loadGroup(newGrp, false)){
#ifdef ENABLE_CACHE_OPT
RsStackMutex stack(distribMtx);
if(!locked_historyCached(newGrp->grpId, cached)){
cId = pCacheId(mOwnId, 1);
gcPair = std::make_pair(newGrp->grpId, cId);
mGrpHistPending.push_back(gcPair);
mUpdateCacheDoc = true;
}
#endif
}
subscribeToGroup(gid, true); subscribeToGroup(gid, true);
} }
else if ((newKey = dynamic_cast<RsDistribGrpKey *>(*lit))) else if ((newKey = dynamic_cast<RsDistribGrpKey *>(*lit)))
@ -2767,11 +2828,6 @@ bool p3GroupDistrib::loadList(std::list<RsItem *>& load)
mGroupsRepublish = false; mGroupsRepublish = false;
delete childSer; delete childSer;
#ifdef ENABLE_CACHE_OPT
if(locked_loadHistoryCacheFile())
locked_buildCacheTable();
#endif
return true; return true;
} }
@ -5066,7 +5122,10 @@ bool p3GroupDistrib::getDummyParentMsgList(std::string grpId, std::string pId, s
RsStackMutex stack(distribMtx); /************* STACK MUTEX ************/ RsStackMutex stack(distribMtx); /************* STACK MUTEX ************/
// load grp from history cache if not already loaded // load grp from history cache if not already loaded
#ifdef ENABLE_CACHE_OPT
locked_processHistoryCached(grpId); locked_processHistoryCached(grpId);
#endif
std::map<std::string, GroupInfo>::iterator git; std::map<std::string, GroupInfo>::iterator git;
if (mGroups.end() == (git = mGroups.find(grpId))) if (mGroups.end() == (git = mGroups.find(grpId)))
{ {

View File

@ -402,7 +402,8 @@ class p3GroupDistrib: public CacheSource, public CacheStore, public p3Config, pu
/*! /*!
* * loads cache data which contains location of cache files belonging
* to group
* @param grpId grp for which to get list of cache data * @param grpId grp for which to get list of cache data
* @param cDataSet cache data belonging to grp is loaded into this list * @param cDataSet cache data belonging to grp is loaded into this list
*/ */
@ -882,6 +883,8 @@ RsDistribDummyMsg *locked_getGroupDummyMsg(std::string grpId, std::string msgId)
std::list<CacheDataPending> mPendingHistCaches; std::list<CacheDataPending> mPendingHistCaches;
std::map<CacheId, CacheData> mLocalHistCachesAvail;
time_t mLastCacheDocUpdate; time_t mLastCacheDocUpdate;
bool mUpdateCacheDoc, mHistoricalCachesLoaded; bool mUpdateCacheDoc, mHistoricalCachesLoaded;