first iteration of history cache opt

compile with ENABLE_CACHE_OPT

git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@4116 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
chrisparker126 2011-03-31 21:41:13 +00:00
parent 26311a7ad6
commit c22a0c79e7
2 changed files with 312 additions and 118 deletions
libretroshare/src/services

View file

@ -34,6 +34,8 @@
#include <openssl/evp.h> #include <openssl/evp.h>
#include <openssl/rand.h> #include <openssl/rand.h>
#include <algorithm> #include <algorithm>
#include <sstream>
#include <fstream>
#include "retroshare/rsdistrib.h" #include "retroshare/rsdistrib.h"
#include "services/p3distrib.h" #include "services/p3distrib.h"
@ -47,6 +49,9 @@
#include "pqi/authssl.h" #include "pqi/authssl.h"
#include "pqi/authgpg.h" #include "pqi/authgpg.h"
#define FAILED_CACHE_CONT "failedcachegrp" // cache id which have failed are stored under a node of this name/grpid
#define HIST_CACHE_FNAME "grp_history.xml"
/***** /*****
* #define DISTRIB_DEBUG 1 * #define DISTRIB_DEBUG 1
* #define DISTRIB_THREAD_DEBUG 1 * #define DISTRIB_THREAD_DEBUG 1
@ -63,6 +68,18 @@ RSA *extractPrivateKey(RsTlvSecurityKey &key);
void setRSAPublicKey(RsTlvSecurityKey &key, RSA *rsa_pub); void setRSAPublicKey(RsTlvSecurityKey &key, RSA *rsa_pub);
void setRSAPrivateKey(RsTlvSecurityKey &key, RSA *rsa_priv); void setRSAPrivateKey(RsTlvSecurityKey &key, RSA *rsa_priv);
// add one set to another while not replacing elements unique to left operand
void operator+=(std::set<pCacheId>& left, const std::set<pCacheId>& right){
std::set<pCacheId>::const_iterator sit = right.begin();
for(; sit != right.end(); sit++)
left.insert(*sit);
return;
}
GroupInfo::~GroupInfo() GroupInfo::~GroupInfo()
{ {
delete distribGroup ; delete distribGroup ;
@ -93,7 +110,9 @@ p3GroupDistrib::p3GroupDistrib(uint16_t subtype,
/* force publication of groups (cleared if local cache file found) */ /* force publication of groups (cleared if local cache file found) */
mGroupsRepublish = true; mGroupsRepublish = true;
mGroupsChanged = true; mGroupsChanged = true;
mCount = 0;
mLastCacheDocUpdate = time(NULL); mLastCacheDocUpdate = time(NULL);
mHistoricalCachesLoaded = false;
mOwnId = AuthSSL::getAuthSSL()->OwnId(); mOwnId = AuthSSL::getAuthSSL()->OwnId();
@ -123,8 +142,8 @@ int p3GroupDistrib::tick()
{ {
RsStackMutex stack(distribMtx); /**** STACK LOCKED MUTEX ****/ RsStackMutex stack(distribMtx); /**** STACK LOCKED MUTEX ****/
toPublish = ((mPendingPublish.size() > 0) || (mPendingPubKeyRecipients.size() > 0)) && (now > (time_t) (mPubPeriod + mLastPublishTime));
toPublish = ((mPendingPublish.size() > 0) || (mPendingPubKeyRecipients.size() > 0)) && (now > (time_t) (mPubPeriod + mLastPublishTime));
} }
if (toPublish) if (toPublish)
@ -181,15 +200,20 @@ int p3GroupDistrib::tick()
bool updateCacheDoc = false; bool updateCacheDoc = false;
{ {
RsStackMutex stack(distribMtx); RsStackMutex stack(distribMtx);
updateCacheDoc = (now > (time_t) (mLastCacheDocUpdate + 15)); updateCacheDoc = (now > (time_t) (mLastCacheDocUpdate + 30));
updateCacheDoc &= !mHistoricalCaches && mUpdateCacheDoc; updateCacheDoc &= !mHistoricalCaches && mUpdateCacheDoc && mHistoricalCachesLoaded;
#ifdef DISTRIB_HISTORY_DEBUG
std::cerr << "num pending grps: " << mGrpHistPending.size() << std::endl;
std::cerr << "num pending msgs: " << mMsgHistPending.size() << std::endl;
std::cerr << "num unique cache ids in table: " << mCachePairsInTable.size() << std::endl;
#endif
} }
if(updateCacheDoc){
#ifdef ENABLE_CACHE_OPT std::cerr << "count: " << mCount << std::endl;
if(updateCacheDoc)
updateCacheDocument(); updateCacheDocument();
#endif
}
return 0; return 0;
} }
@ -208,14 +232,21 @@ int p3GroupDistrib::loadCache(const CacheData &data)
std::cerr << std::endl; std::cerr << std::endl;
#endif #endif
if(mHistoricalCaches)
{ {
RsStackMutex stack(distribMtx); RsStackMutex stack(distribMtx);
#ifdef DISTRIB_THREAD_DEBUG #ifdef DISTRIB_THREAD_DEBUG
std::cerr << "p3GroupDistrib::loadCache() Storing PendingRemoteCache"; std::cerr << "p3GroupDistrib::loadCache() Storing historical PendingRemoteCache";
std::cerr << std::endl; std::cerr << std::endl;
#endif #endif
/* store the cache file for later processing */ /* store the cache file for later processing */
mPendingHistCaches.push_back(CacheDataPending(data, false, mHistoricalCaches));
}else
{
#ifdef DISTRIB_THREAD_DEBUG
std::cerr << "p3GroupDistrib::loadCache() Storing non historical PendingRemoteCache";
std::cerr << std::endl;
#endif
mPendingCaches.push_back(CacheDataPending(data, false, mHistoricalCaches)); mPendingCaches.push_back(CacheDataPending(data, false, mHistoricalCaches));
} }
@ -257,34 +288,6 @@ bool p3GroupDistrib::loadLocalCache(const CacheData &data)
} }
bool p3GroupDistrib::loadCacheDoc(RsDistribConfigData& historyDoc)
{
if((historyDoc.service_data.bin_len == 0) || (historyDoc.service_data.bin_data == NULL)){
#ifdef DISTRIB_HISTORY_DEBUG
std::cerr << "p3GroupDistrib::loadCacheDoc" << "Failed to load cache doc!"
<< "\nNo Data!"
<< std::endl;
#endif
return false;
}
RsStackMutex stack(distribMtx);
pugi::xml_parse_result result;
result = mCacheDoc.load_buffer_inplace_own(historyDoc.service_data.bin_data,
historyDoc.service_data.bin_len);
if(!result){
#ifdef DISTRIB_HISTORY_DEBUG
std::cerr << "p3GroupDistrib::loadCacheDoc" << "Failed to load cache doc!"
<< std::endl;
#endif
return false;
}
return true;
}
void p3GroupDistrib::updateCacheDocument() void p3GroupDistrib::updateCacheDocument()
{ {
@ -298,6 +301,18 @@ void p3GroupDistrib::updateCacheDocument()
#endif #endif
std::vector<grpNodePair> grpNodes; std::vector<grpNodePair> grpNodes;
std::string failedCacheId = FAILED_CACHE_CONT;
// failed cache content node is has not been created add to doc
if(mCacheTable.find(failedCacheId) == mCacheTable.end()){
mCacheDoc.append_child("group");
mCacheDoc.last_child().append_child("grpId").append_child(
pugi::node_pcdata).set_value(failedCacheId.c_str());
grpNodes.push_back(grpNodePair(failedCacheId, mCacheDoc.last_child()));
}
std::map<std::string, nodeCache>::iterator nodeCache_iter; std::map<std::string, nodeCache>::iterator nodeCache_iter;
// for transforming int to string // for transforming int to string
@ -369,7 +384,12 @@ void p3GroupDistrib::updateCacheDocument()
// now update document with new msg cache info // now update document with new msg cache info
msgIt = mMsgHistPending.begin(); msgIt = mMsgHistPending.begin();
std::vector<grpCachePair> msgHistRestart;
pugi::xml_node messages_node; pugi::xml_node messages_node;
pCacheId pCid;
int count = 0;
int count2 = 0, count3 = 0;
for(; msgIt != mMsgHistPending.end(); msgIt++) for(; msgIt != mMsgHistPending.end(); msgIt++)
{ {
@ -379,6 +399,20 @@ void p3GroupDistrib::updateCacheDocument()
if(nodeCache_iter != mCacheTable.end()){ if(nodeCache_iter != mCacheTable.end()){
pCid = pCacheId(msgIt->second.first,
msgIt->second.second);
// ensure you don't add cache ids twice to same group
// // by checking cache table and current msg additions
// if(nodeCache_iter->second.cIdSet.find(pCid) !=
// nodeCache_iter->second.cIdSet.end())
// count2++;
//
// if(msgCacheMap[msgIt->first].find(pCid) != msgCacheMap[msgIt->first].end())
// count3++;
nodeIter = nodeCache_iter->second.node; nodeIter = nodeCache_iter->second.node;
messages_node = nodeIter.child("messages"); messages_node = nodeIter.child("messages");
@ -392,15 +426,14 @@ void p3GroupDistrib::updateCacheDocument()
messages_node.last_child().append_child("pId").append_child( messages_node.last_child().append_child("pId").append_child(
pugi::node_pcdata).set_value(msgIt->second.first pugi::node_pcdata).set_value(msgIt->second.first
.c_str()); .c_str());
sprintf(subIdBuffer, "%d", msgIt->second.second); sprintf(subIdBuffer, "%d", msgIt->second.second);
subId = subIdBuffer; subId = subIdBuffer;
messages_node.last_child().append_child("subId").append_child( messages_node.last_child().append_child("subId").append_child(
pugi::node_pcdata).set_value(subId.c_str()); pugi::node_pcdata).set_value(subId.c_str());
// add msg to grp set // add msg to grp set
msgCacheMap[msgIt->first].insert(pCacheId(msgIt->second.first, msgCacheMap[msgIt->first].insert(pCid);
msgIt->second.second)); count++;
} }
else{ else{
@ -410,16 +443,34 @@ void p3GroupDistrib::updateCacheDocument()
<< "\nBut Parent group does not exists in cache table!" << "\nBut Parent group does not exists in cache table!"
<< std::endl; << std::endl;
#endif #endif
// remove from map but keep for later in case historical grp loads aren't done yet
msgCacheMap.erase(msgIt->first); msgCacheMap.erase(msgIt->first);
msgHistRestart.push_back(*msgIt);
} }
} }
// now update cache table by tagging msg cache ids to their // now update cache table by tagging msg cache ids to their
// respective groups // respective groups
locked_updateCacheTableMsg(msgCacheMap); locked_updateCacheTableMsg(msgCacheMap);
// clear pending as updating finished // clear msg pending if all pending historical grps have been loaded
if(mHistoricalCachesLoaded && (mGrpHistPending.size() == 0)){
#ifdef DISTRIB_HISTORY_DEBUG
std::cerr << "mMsgHistRestart() " << msgHistRestart.size() << std::endl;
std::cerr << "mMsgHistPending() " << mMsgHistPending.size() << std::endl;
std::cerr << "count: " << count << " " << count2 << " " << count3 << std::endl;
#endif
mMsgHistPending.clear(); mMsgHistPending.clear();
}
else // if not keep the messages for next round of loads
{
mMsgHistPending.clear();
mMsgHistPending = msgHistRestart;
}
// indicate latest update to reset tick observer // indicate latest update to reset tick observer
mLastCacheDocUpdate = time(NULL); mLastCacheDocUpdate = time(NULL);
@ -453,6 +504,7 @@ void p3GroupDistrib::locked_updateCacheTableMsg(const std::map<std::string, std:
#ifdef DISTRIB_HISTORY_DEBUG #ifdef DISTRIB_HISTORY_DEBUG
std::cerr << "p3GroupDistrib::locked_updateCacheTableMsg() " std::cerr << "p3GroupDistrib::locked_updateCacheTableMsg() "
<< "loading " << msgCacheMap.size() << " messages"
<< std::endl; << std::endl;
#endif #endif
@ -471,9 +523,21 @@ void p3GroupDistrib::locked_updateCacheTableMsg(const std::map<std::string, std:
{ {
cit = mCacheTable.find(mit->first); cit = mCacheTable.find(mit->first);
#ifdef DISTRIB_HISTORY_DEBUG
std::cerr << "p3GroupDistrib::locked_updateCacheTableMsg() "
<< "\nAdding." << mit->second.size() << "to grp "
<< mit->first << std::endl;
#endif
// add new cache ids to grp // add new cache ids to grp
if(cit != mCacheTable.end()){ if(cit != mCacheTable.end()){
cit->second.cIdSet = mit->second; cit->second.cIdSet += mit->second;
// don't add failed caches to cache pairs in table
if(mit->first != FAILED_CACHE_CONT)
mCachePairsInTable += mit->second;
else
mCacheFailedTable += mit->second;
}else{ }else{
#ifdef DISTRIB_HISTORY_DEBUG #ifdef DISTRIB_HISTORY_DEBUG
std::cerr << "p3GroupDistrib::locked_updateCacheTableMsg() " std::cerr << "p3GroupDistrib::locked_updateCacheTableMsg() "
@ -499,24 +563,19 @@ bool p3GroupDistrib::locked_historyCached(const std::string& grpId, bool& cached
return false; return false;
} }
bool p3GroupDistrib::locked_historyCached(const std::string& grpId, const pCacheId& cId, bool& cached) bool p3GroupDistrib::locked_historyCached(const pCacheId& cId)
{ {
cached = false; if(mCachePairsInTable.find(cId) != mCachePairsInTable.end())
return true;
std::map<std::string, nodeCache>::iterator cit;
if(mCacheTable.end() != (cit = mCacheTable.find(grpId))) if(mCacheFailedTable.find(cId) != mCacheFailedTable.end())
{
if(cit->second.cIdSet.find(cId) != cit->second.cIdSet.end()){
cached = cit->second.cached;
return true; return true;
}
}
return false; return false;
} }
bool p3GroupDistrib::buildCacheTable(){ bool p3GroupDistrib::locked_buildCacheTable(){
#ifdef DISTRIB_HISTORY_DEBUG #ifdef DISTRIB_HISTORY_DEBUG
std::cerr << "p3GroupDistrib::buildCacheTable()" std::cerr << "p3GroupDistrib::buildCacheTable()"
@ -534,6 +593,7 @@ bool p3GroupDistrib::buildCacheTable(){
pugi::xml_node_iterator grpIt = mCacheDoc.begin(), msgIt; pugi::xml_node_iterator grpIt = mCacheDoc.begin(), msgIt;
pugi::xml_node messages_node; pugi::xml_node messages_node;
std::map<std::string, std::set<pCacheId> > msgCacheMap; std::map<std::string, std::set<pCacheId> > msgCacheMap;
std::vector<grpNodePair> grpNodes; std::vector<grpNodePair> grpNodes;
@ -545,7 +605,7 @@ bool p3GroupDistrib::buildCacheTable(){
grpId = grpIt->child_value("grpId"); grpId = grpIt->child_value("grpId");
// add grps to grp node list // add grps to grp node list
grpNodes.push_back(grpNodePair(grpId, mCacheDoc.last_child()/*(*grpIt)*/)); grpNodes.push_back(grpNodePair(grpId, *grpIt));
messages_node = grpIt->child("messages"); messages_node = grpIt->child("messages");
if(messages_node){ if(messages_node){
@ -696,6 +756,80 @@ void p3GroupDistrib::locked_getHistoryCacheData(const std::string& grpId, std::l
return; return;
} }
bool p3GroupDistrib::locked_loadHistoryCacheFile()
{
std::string hFileName = mKeyBackUpDir + "/" + HIST_CACHE_FNAME;
std::ifstream hFile(hFileName.c_str());
int fileLength;
char* fileLoadBuffer;
char* decryptedCacheFile;
int outlen = 0;
bool ok = false;
hFile.seekg(0, std::ios::end);
fileLength = hFile.tellg();
hFile.seekg(0, std::ios::beg);
if(fileLength <= 0)
return false;
fileLoadBuffer = new char[fileLength];
hFile.read(fileLoadBuffer, fileLength);
hFile.close();
ok = AuthSSL::getAuthSSL()->decrypt((void*&)decryptedCacheFile, outlen,
fileLoadBuffer, fileLength);
char* buffer = static_cast<char*>(pugi::get_memory_allocation_function()(outlen));
memcpy(buffer, decryptedCacheFile, outlen);
ok &= mCacheDoc.load_buffer_inplace_own(buffer, outlen);
delete[] fileLoadBuffer;
delete[] decryptedCacheFile;
return ok;
}
bool p3GroupDistrib::locked_saveHistoryCacheFile()
{
std::cout << mCacheDoc.last_child().value();
if(mCacheDoc.empty())
return false;
std::string hFileName = mKeyBackUpDir + "/" + HIST_CACHE_FNAME;
std::ofstream hFile(hFileName.c_str());
std::ostringstream cacheStream;
char* fileBuffer = NULL;
int streamLength;
char* encryptedFileBuffer = NULL;
int outlen = 0;
bool ok = false;
mCacheDoc.save(cacheStream);
streamLength = cacheStream.str().length();
std::string cacheContent = cacheStream.str();
fileBuffer = new char[cacheContent.size()];
cacheContent.copy(fileBuffer, cacheContent.size(), 0);
ok = AuthSSL::getAuthSSL()->encrypt((void*&)encryptedFileBuffer, outlen,
(void*&)fileBuffer, streamLength, mOwnId);
hFile.write(encryptedFileBuffer, outlen);
hFile.close();
if(fileBuffer)
delete[] fileBuffer;
if(encryptedFileBuffer)
delete[] encryptedFileBuffer;
return ok;
}
/* Handle the Cache Pending Setup */ /* Handle the Cache Pending Setup */
CacheDataPending::CacheDataPending(const CacheData &data, bool local, bool historical) CacheDataPending::CacheDataPending(const CacheData &data, bool local, bool historical)
:mData(data), mLocal(local), mHistorical(historical) :mData(data), mLocal(local), mHistorical(historical)
@ -709,6 +843,12 @@ void p3GroupDistrib::HistoricalCachesDone()
mHistoricalCaches = false; // called when Stored Caches have been added to Pending List. mHistoricalCaches = false; // called when Stored Caches have been added to Pending List.
} }
void p3GroupDistrib::HistoricalCachesLoaded()
{
RsStackMutex stack(distribMtx);
mHistoricalCachesLoaded = true;
}
/* From RsThread */ /* From RsThread */
void p3GroupDistrib::run() /* called once the thread is started */ void p3GroupDistrib::run() /* called once the thread is started */
{ {
@ -721,18 +861,31 @@ void p3GroupDistrib::run() /* called once the thread is started */
#ifdef DISTRIB_DUMMYMSG_DEBUG #ifdef DISTRIB_DUMMYMSG_DEBUG
int printed = 0; int printed = 0;
#endif #endif
CacheData cache;
while(1) while(1)
{ {
/* */ /* */
CacheData cache;
bool validCache = false; bool validCache = false;
bool isLocal = false; bool isLocal = false;
bool isHistorical = false; bool isHistorical = false;
{ {
RsStackMutex stack(distribMtx); RsStackMutex stack(distribMtx);
if(!mHistoricalCaches){
if (mPendingCaches.size() > 0)
if(mPendingHistCaches.size() > 0){
// std::cerr << "loaded pending caches: " << mPendingHistCaches.size() << std::endl;
CacheDataPending &pendingCache = mPendingHistCaches.front();
cache = pendingCache.mData;
isLocal = pendingCache.mLocal;
isHistorical = pendingCache.mHistorical;
validCache = true;
mPendingHistCaches.pop_front();
}
else if (mPendingCaches.size() > 0)
{ {
CacheDataPending &pendingCache = mPendingCaches.front(); CacheDataPending &pendingCache = mPendingCaches.front();
cache = pendingCache.mData; cache = pendingCache.mData;
@ -749,11 +902,16 @@ void p3GroupDistrib::run() /* called once the thread is started */
} }
} }
}
if (validCache) if (validCache)
{ {
loadAnyCache(cache, isLocal, isHistorical); loadAnyCache(cache, isLocal, isHistorical);
if(!mHistoricalCachesLoaded){
if(mPendingHistCaches.size() == 0)
HistoricalCachesLoaded();
}
#ifndef WINDOWS_SYS #ifndef WINDOWS_SYS
usleep(1000); usleep(1000);
#else #else
@ -800,7 +958,6 @@ int p3GroupDistrib::loadAnyCache(const CacheData &data, bool local, bool his
std::cerr << "Cid: " << data.cid.type << ":" << data.cid.subid << std::endl; std::cerr << "Cid: " << data.cid.type << ":" << data.cid.subid << std::endl;
#endif #endif
// if its historical then don't load if xml doc exists
if (data.cid.subid == 1) if (data.cid.subid == 1)
{ {
@ -900,7 +1057,6 @@ void p3GroupDistrib::loadFileGroups(const std::string &filename, const std::stri
return; return;
} }
void p3GroupDistrib::loadFileMsgs(const std::string &filename, uint16_t cacheSubId, const std::string &src, uint32_t ts, bool local, bool historical) void p3GroupDistrib::loadFileMsgs(const std::string &filename, uint16_t cacheSubId, const std::string &src, uint32_t ts, bool local, bool historical)
{ {
@ -910,9 +1066,25 @@ void p3GroupDistrib::loadFileMsgs(const std::string &filename, uint16_t cacheSub
#endif #endif
time_t now = time(NULL); time_t now = time(NULL);
//time_t start = now; bool cache = false;
//time_t end = 0;
// if cache id exists in cache table exit
{
RsStackMutex stack(distribMtx);
if(locked_historyCached(pCacheId(src, cacheSubId))){
return;
}
else
{
cache = true;
}
}
// link grp to cache id (only one cache id, so doesn't matter if one grp comes out twice
// with same cache id)
std::map<std::string, pCacheId> msgCacheMap;
pCacheId failedCache = pCacheId(src, cacheSubId);
/* create the serialiser to load msgs */ /* create the serialiser to load msgs */
BinInterface *bio = new BinFileInterface(filename.c_str(), BIN_FLAGS_READABLE); BinInterface *bio = new BinFileInterface(filename.c_str(), BIN_FLAGS_READABLE);
pqistore *store = createStore(bio, src, BIN_FLAGS_READABLE); pqistore *store = createStore(bio, src, BIN_FLAGS_READABLE);
@ -923,6 +1095,9 @@ void p3GroupDistrib::loadFileMsgs(const std::string &filename, uint16_t cacheSub
RsItem *item; RsItem *item;
RsDistribSignedMsg *newMsg; RsDistribSignedMsg *newMsg;
std::string grpId;
while(NULL != (item = store->GetItem())) while(NULL != (item = store->GetItem()))
{ {
@ -935,32 +1110,14 @@ void p3GroupDistrib::loadFileMsgs(const std::string &filename, uint16_t cacheSub
if ((newMsg = dynamic_cast<RsDistribSignedMsg *>(item))) if ((newMsg = dynamic_cast<RsDistribSignedMsg *>(item)))
{ {
std::string grpId = newMsg->grpId; grpId = newMsg->grpId;
bool cached = false; if(loadMsg(newMsg, src, local, historical))
pCacheId cId(src, cacheSubId);
RsStackMutex stack(distribMtx);
// if msg cache not present in table then load immediately
if(!locked_historyCached(newMsg->grpId, cId, cached) || !historical){
#ifdef DISTRIB_HISTORY_DEBUG
std::cerr << "p3GroupDistrib::loadFileMsgs()"
<< "\nNo msg cache not present, loading"
<< std::endl;
#endif
if(locked_loadMsg(newMsg, src, local, historical)){
mMsgHistPending.push_back(grpCachePair(grpId, cId));
mUpdateCacheDoc = true;
}
}else
{ {
#ifdef DISTRIB_HISTORY_DEBUG
std::cerr << "p3GroupDistrib::loadFileMsgs()" if(cache)
<< "\nNo msg cache present, not loading" {
<< std::endl; msgCacheMap.insert(grpCachePair(grpId, pCacheId(src, cacheSubId)));
#endif }
} }
} }
@ -975,6 +1132,29 @@ void p3GroupDistrib::loadFileMsgs(const std::string &filename, uint16_t cacheSub
} }
} }
std::map<std::string, pCacheId>::iterator mit;
if(cache){
RsStackMutex stack(distribMtx);
mit = msgCacheMap.begin();
for(;mit != msgCacheMap.end(); mit++)
{
mMsgHistPending.push_back(grpCachePair(mit->first, mit->second));
}
mUpdateCacheDoc = true;
if(!msgCacheMap.empty())
mCount++;
std::string failedCacheId = FAILED_CACHE_CONT;
// if msg cache map is empty then cache id failed
if(msgCacheMap.empty())
mMsgHistPending.push_back(grpCachePair(failedCacheId, failedCache));
}
if (local) if (local)
{ {
@ -1012,6 +1192,7 @@ void p3GroupDistrib::loadFileMsgs(const std::string &filename, uint16_t cacheSub
return; return;
} }
//TODO make carbon copy of sister
void p3GroupDistrib::locked_loadFileMsgs(const std::string &filename, uint16_t cacheSubId, const std::string &src, uint32_t ts, bool local, bool historical) void p3GroupDistrib::locked_loadFileMsgs(const std::string &filename, uint16_t cacheSubId, const std::string &src, uint32_t ts, bool local, bool historical)
{ {
@ -1046,16 +1227,7 @@ void p3GroupDistrib::locked_loadFileMsgs(const std::string &filename, uint16_t c
if ((newMsg = dynamic_cast<RsDistribSignedMsg *>(item))) if ((newMsg = dynamic_cast<RsDistribSignedMsg *>(item)))
{ {
std::string grpId = newMsg->grpId;
bool cached = false;
pCacheId cId(src, cacheSubId);
// if msg cache not present in table or not historical then load immediately
if(!locked_historyCached(newMsg->grpId, cId, cached) || !historical){
locked_loadMsg(newMsg, src, local, historical); locked_loadMsg(newMsg, src, local, historical);
}
} }
else else
{ {
@ -2050,6 +2222,8 @@ bool p3GroupDistrib::getAllMsgList(std::string grpId, std::list<std::string> &ms
RsStackMutex stack(distribMtx); /************* STACK MUTEX ************/ RsStackMutex stack(distribMtx); /************* STACK MUTEX ************/
locked_processHistoryCached(grpId);
std::map<std::string, GroupInfo>::iterator git; std::map<std::string, GroupInfo>::iterator git;
if (mGroups.end() == (git = mGroups.find(grpId))) if (mGroups.end() == (git = mGroups.find(grpId)))
{ {
@ -2070,6 +2244,9 @@ bool p3GroupDistrib::getParentMsgList(std::string grpId, std::string pId,
std::list<std::string> &msgIds) std::list<std::string> &msgIds)
{ {
RsStackMutex stack(distribMtx); /************* STACK MUTEX ************/ RsStackMutex stack(distribMtx); /************* STACK MUTEX ************/
locked_processHistoryCached(grpId);
std::map<std::string, GroupInfo>::iterator git; std::map<std::string, GroupInfo>::iterator git;
if (mGroups.end() == (git = mGroups.find(grpId))) if (mGroups.end() == (git = mGroups.find(grpId)))
{ {
@ -2114,10 +2291,6 @@ bool p3GroupDistrib::getTimePeriodMsgList(std::string grpId, uint32_t timeMin,
GroupInfo *p3GroupDistrib::locked_getGroupInfo(std::string grpId) GroupInfo *p3GroupDistrib::locked_getGroupInfo(std::string grpId)
{ {
#ifdef ENABLE_CACHE_OPT
locked_processHistoryCached(grpId);
#endif
/************* ALREADY LOCKED ************/ /************* ALREADY LOCKED ************/
std::map<std::string, GroupInfo>::iterator git; std::map<std::string, GroupInfo>::iterator git;
if (mGroups.end() == (git = mGroups.find(grpId))) if (mGroups.end() == (git = mGroups.find(grpId)))
@ -2133,6 +2306,8 @@ RsDistribMsg *p3GroupDistrib::locked_getGroupMsg(std::string grpId, std::string
/************* ALREADY LOCKED ************/ /************* ALREADY LOCKED ************/
locked_processHistoryCached(grpId);
std::map<std::string, GroupInfo>::iterator git; std::map<std::string, GroupInfo>::iterator git;
if (mGroups.end() == (git = mGroups.find(grpId))) if (mGroups.end() == (git = mGroups.find(grpId)))
{ {
@ -2440,12 +2615,10 @@ bool p3GroupDistrib::saveList(bool &cleanup, std::list<RsItem *>& saveData)
} }
delete childSer; delete childSer;
std::string histCacheFile = mKeyBackUpDir + "/" + "grp_history.xml";
// now save hostory doc // now save hostory doc
#ifdef ENABLE_CACHE_OPT locked_saveHistoryCacheFile();
mCacheDoc.save_file(histCacheFile.c_str());
#endif
return true; return true;
} }
@ -2482,6 +2655,7 @@ bool p3GroupDistrib::loadList(std::list<RsItem *>& load)
RsDistribSignedMsg *newMsg = NULL; RsDistribSignedMsg *newMsg = NULL;
RsDistribConfigData* newChildConfig = NULL; RsDistribConfigData* newChildConfig = NULL;
if ((newGrp = dynamic_cast<RsDistribGrp *>(*lit))) if ((newGrp = dynamic_cast<RsDistribGrp *>(*lit)))
{ {
const std::string &gid = newGrp -> grpId; const std::string &gid = newGrp -> grpId;
@ -2527,11 +2701,10 @@ bool p3GroupDistrib::loadList(std::list<RsItem *>& load)
mGroupsRepublish = false; mGroupsRepublish = false;
delete childSer; delete childSer;
std::string histCacheFile = mKeyBackUpDir + "/" + "grp_history.xml";
#ifdef ENABLE_CACHE_OPT #ifdef ENABLE_CACHE_OPT
mCacheDoc.load_file(histCacheFile.c_str()); if(locked_loadHistoryCacheFile())
buildCacheTable(); locked_buildCacheTable();
#endif #endif
return true; return true;
@ -4820,11 +4993,15 @@ bool p3GroupDistrib::locked_printDummyMsgs(GroupInfo &grp)
bool p3GroupDistrib::getDummyParentMsgList(std::string grpId, std::string pId, std::list<std::string> &msgIds) bool p3GroupDistrib::getDummyParentMsgList(std::string grpId, std::string pId, std::list<std::string> &msgIds)
{ {
#ifdef DISTRIB_DUMMYMSG_DEBUG #ifdef DISTRIB_DUMMYMSG_DEBUG
std::cerr << "p3GroupDistrib::getDummyParentMsgList(grpId:" << grpId << "," << pId << ")"; std::cerr << "p3GroupDistrib::getDummyParentMsgList(grpId:" << grpId << "," << pId << ")";
std::cerr << std::endl; std::cerr << std::endl;
#endif #endif
RsStackMutex stack(distribMtx); /************* STACK MUTEX ************/ RsStackMutex stack(distribMtx); /************* STACK MUTEX ************/
// load grp from history cache if not already loaded
locked_processHistoryCached(grpId);
std::map<std::string, GroupInfo>::iterator git; std::map<std::string, GroupInfo>::iterator git;
if (mGroups.end() == (git = mGroups.find(grpId))) if (mGroups.end() == (git = mGroups.find(grpId)))
{ {
@ -4855,6 +5032,8 @@ bool p3GroupDistrib::getDummyParentMsgList(std::string grpId, std::string pId, s
RsDistribDummyMsg *p3GroupDistrib::locked_getGroupDummyMsg(std::string grpId, std::string msgId) RsDistribDummyMsg *p3GroupDistrib::locked_getGroupDummyMsg(std::string grpId, std::string msgId)
{ {
locked_processHistoryCached(grpId);
#ifdef DISTRIB_DUMMYMSG_DEBUG #ifdef DISTRIB_DUMMYMSG_DEBUG
std::cerr << "p3GroupDistrib::locked_getGroupDummyMsg(grpId:" << grpId << "," << msgId << ")"; std::cerr << "p3GroupDistrib::locked_getGroupDummyMsg(grpId:" << grpId << "," << msgId << ")";
std::cerr << std::endl; std::cerr << std::endl;

View file

@ -341,6 +341,11 @@ class p3GroupDistrib: public CacheSource, public CacheStore, public p3Config, pu
private: private:
/*!
* called when all historical caches have been loaded
*/
void HistoricalCachesLoaded();
/*! /*!
* This updates the cache document with pending msg and grp cache data * This updates the cache document with pending msg and grp cache data
*/ */
@ -378,18 +383,16 @@ class p3GroupDistrib: public CacheSource, public CacheStore, public p3Config, pu
bool locked_historyCached(const std::string& grpId, bool& cached); bool locked_historyCached(const std::string& grpId, bool& cached);
/*! /*!
* @param id of grp msg belongs to * @param cache cache data id
* @param cache id of msg
* @param on return this is false if msg has not been cached and vice versa
* @return false if cache entry does not exist in table * @return false if cache entry does not exist in table
*/ */
bool locked_historyCached(const std::string& grpId, const pCacheId& cId, bool& cached); bool locked_historyCached(const pCacheId& cId);
/*! /*!
* builds cache table from loaded cached document * builds cache table from loaded cached document
* @return false if cache document is empty * @return false if cache document is empty
*/ */
bool buildCacheTable(void); bool locked_buildCacheTable(void);
/*! /*!
* if grp's message is not loaded, load it, and update cache table * if grp's message is not loaded, load it, and update cache table
@ -405,6 +408,16 @@ class p3GroupDistrib: public CacheSource, public CacheStore, public p3Config, pu
*/ */
void locked_getHistoryCacheData(const std::string& grpId, std::list<CacheData>& cDataSet); void locked_getHistoryCacheData(const std::string& grpId, std::list<CacheData>& cDataSet);
/*!
* encrypts and saves cache file
*/
bool locked_saveHistoryCacheFile();
/*!
* decrypte and save cache file
*/
bool locked_loadHistoryCacheFile();
private: private:
/* these lists are filled by the overloaded fns... then cleared by the thread */ /* these lists are filled by the overloaded fns... then cleared by the thread */
@ -858,17 +871,19 @@ RsDistribDummyMsg *locked_getGroupDummyMsg(std::string grpId, std::string msgId)
time_t mLastKeyPublishTime, mLastRecvdKeyTime; time_t mLastKeyPublishTime, mLastRecvdKeyTime;
////////////// cache optimisation //////////////// ////////////// cache optimisation ////////////////
int mCount;
/// table containing new msg cache data to be added to xml doc ( grpid, (cid,pid) ) /// table containing new msg cache data to be added to xml doc ( grpid, (cid,pid) )
std::vector<grpCachePair> mGrpHistPending; std::vector<grpCachePair> mGrpHistPending;
/// table containing new grp cache data to be added to xml doc (grpid, (cid,pid) ) /// table containing new grp cache data to be added to xml doc (grpid, (cid,pid) )
std::vector<grpCachePair> mMsgHistPending; std::vector<grpCachePair> mMsgHistPending;
std::set<pCacheId> mCachePairsInTable, mCacheFailedTable;
std::list<CacheDataPending> mPendingHistCaches;
time_t mLastCacheDocUpdate; time_t mLastCacheDocUpdate;
bool mUpdateCacheDoc; bool mUpdateCacheDoc, mHistoricalCachesLoaded;
std::map<std::string, nodeCache> mCacheTable; // (cid, node) std::map<std::string, nodeCache> mCacheTable; // (cid, node)