increase granularity of mutex stacks needed to process cached groups.

this gets rid of the delay in loading cache coupled with 
the previous Qthread loading in forums. 
removed uneccessary locked_loadMsg function

git-svn-id: http://svn.code.sf.net/p/retroshare/code/branches/v0.5-cacheopt@4413 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
chrisparker126 2011-07-09 15:49:58 +00:00
parent f7e3a3f1b3
commit 70fe48a7e7
3 changed files with 111 additions and 189 deletions

View file

@ -204,7 +204,7 @@ int p3GroupDistrib::tick()
bool updateCacheDoc = false;
{
RsStackMutex stack(distribMtx);
updateCacheDoc = (now > (time_t) (mLastCacheDocUpdate + 30));
updateCacheDoc = (now > (time_t) (mLastCacheDocUpdate + 10));
updateCacheDoc &= !mHistoricalCaches && mUpdateCacheDoc && mHistoricalCachesLoaded;
#ifdef DISTRIB_HISTORY_DEBUG
std::cerr << "num pending grps: " << mGrpHistPending.size() << std::endl;
@ -647,12 +647,9 @@ bool p3GroupDistrib::locked_buildCacheTable(){
return true;
}
void p3GroupDistrib::locked_processHistoryCached(const std::string& grpId)
void p3GroupDistrib::processHistoryCached(const std::string& grpId)
{
// no processing should be done until cache locations have been stored in memory
if(mHistoricalCaches)
return;
#ifdef DISTRIB_HISTORY_DEBUG
std::cerr << "p3GroupDistrib::locked_processHistoryCached() "
@ -660,7 +657,15 @@ void p3GroupDistrib::locked_processHistoryCached(const std::string& grpId)
#endif
bool cached = true;
locked_historyCached(grpId, cached);
{
RsStackMutex stack(distribMtx);
// no processing should be done until cache locations have been stored in memory
if(mHistoricalCaches)
return;
locked_historyCached(grpId, cached);
}
std::list<CacheData> cDataList;
std::list<CacheData>::iterator cit;
@ -671,14 +676,24 @@ void p3GroupDistrib::locked_processHistoryCached(const std::string& grpId)
// if not history cached then load it
if(!cached)
{
// get list of cache id belonging to grp
locked_getHistoryCacheData(grpId, cDataList);
cit = cDataList.begin();
{
RsStackMutex stack(distribMtx);
locked_getHistoryCacheData(grpId, cDataList);
for(; cit != cDataList.end(); cit++){
cit = cDataList.begin();
for(; cit != cDataList.end(); cit++){
cit->cid.type = cacheType;
locked_getStoredCache(*cit);
}
}
// now load
for(cit = cDataList.begin(); cit != cDataList.end(); cit++){
cit->cid.type = cacheType;
locked_getStoredCache(*cit);
file = cit->path;
file += "/";
file += cit->name;
@ -686,13 +701,16 @@ void p3GroupDistrib::locked_processHistoryCached(const std::string& grpId)
// note: you could load msgs for a cache historied group that is not loaded,
// but any request for info of affected grp will consequently load
// all its msgs through this function anyways
locked_loadFileMsgs(file, cit->cid.subid, cit->pid, cit->recvd, (cit->pid == mOwnId), true);
loadFileMsgs(file, cit->cid.subid, cit->pid, cit->recvd, (cit->pid == mOwnId), true, true);
}
}
locked_updateCacheTableEntry(grpId, true);
{
RsStackMutex stack(distribMtx);
locked_updateCacheTableEntry(grpId, true);
}
return;
}
@ -992,7 +1010,7 @@ int p3GroupDistrib::loadAnyCache(const CacheData &data, bool local, bool his
}
else
{
loadFileMsgs(file, data.cid.subid, data.pid, data.recvd, local, historical);
loadFileMsgs(file, data.cid.subid, data.pid, data.recvd, local, historical, false);
}
return true;
@ -1083,7 +1101,8 @@ void p3GroupDistrib::loadFileGroups(const std::string &filename, const std::stri
return;
}
void p3GroupDistrib::loadFileMsgs(const std::string &filename, uint16_t cacheSubId, const std::string &src, uint32_t ts, bool local, bool historical)
void p3GroupDistrib::loadFileMsgs(const std::string &filename, uint16_t cacheSubId, const std::string &src, uint32_t ts, bool local, bool historical,
bool cacheLoad)
{
#ifdef DISTRIB_DEBUG
@ -1098,14 +1117,21 @@ void p3GroupDistrib::loadFileMsgs(const std::string &filename, uint16_t cacheSub
// if cache id exists in cache table exit
{
RsStackMutex stack(distribMtx);
// if this is a cache load proceed if not check
// cache id exists in cache table, if so don't load
if(locked_historyCached(pCacheId(src, cacheSubId))){
return;
}
else
if(!cacheLoad)
{
cache = true;
if(historical && locked_historyCached(pCacheId(src, cacheSubId)))
{
return;
}
else
{
cache = true;
}
}
}
#endif
@ -1216,133 +1242,6 @@ void p3GroupDistrib::loadFileMsgs(const std::string &filename, uint16_t cacheSub
return;
}
//TODO make carbon copy of sister
void p3GroupDistrib::locked_loadFileMsgs(const std::string &filename, uint16_t cacheSubId, const std::string &src, uint32_t ts, bool local, bool historical)
{
#ifdef DISTRIB_DEBUG
std::cerr << "p3GroupDistrib::loadFileMsgs()";
std::cerr << std::endl;
#endif
time_t now = time(NULL);
bool cache = false;
#ifdef ENABLE_CACHE_OPT
// if cache id exists in cache table exit
if(!historical){
if(locked_historyCached(pCacheId(src, cacheSubId))){
return;
}
else
{
cache = true;
}
}
#endif
// link grp to cache id (only one cache id, so doesn't matter if one grp comes out twice
// with same cache id)
std::map<std::string, pCacheId> msgCacheMap;
pCacheId failedCache = pCacheId(src, cacheSubId);
/* create the serialiser to load msgs */
BinInterface *bio = new BinFileInterface(filename.c_str(), BIN_FLAGS_READABLE);
pqistore *store = createStore(bio, src, BIN_FLAGS_READABLE);
#ifdef DISTRIB_DEBUG
std::cerr << "loading file " << filename << std::endl ;
#endif
RsItem *item;
RsDistribSignedMsg *newMsg;
std::string grpId;
while(NULL != (item = store->GetItem()))
{
#ifdef DISTRIB_DEBUG
std::cerr << "p3GroupDistrib::loadFileMsgs() Got Item:";
std::cerr << std::endl;
item->print(std::cerr, 10);
std::cerr << std::endl;
#endif
if ((newMsg = dynamic_cast<RsDistribSignedMsg *>(item)))
{
grpId = newMsg->grpId;
if(locked_loadMsg(newMsg, src, local, historical))
{
if(cache)
{
msgCacheMap.insert(grpCachePair(grpId, pCacheId(src, cacheSubId)));
}
}
}
else
{
#ifdef DISTRIB_DEBUG
std::cerr << "p3GroupDistrib::loadFileMsgs() Unexpected Item - deleting";
std::cerr << std::endl;
#endif
/* wrong message type */
delete item;
}
}
std::map<std::string, pCacheId>::iterator mit;
if(cache){
mit = msgCacheMap.begin();
for(;mit != msgCacheMap.end(); mit++)
{
mMsgHistPending.push_back(grpCachePair(mit->first, mit->second));
}
mUpdateCacheDoc = true;
if(!msgCacheMap.empty())
mCount++;
std::string failedCacheId = FAILED_CACHE_CONT;
// if msg cache map is empty then cache id failed
if(msgCacheMap.empty())
mMsgHistPending.push_back(grpCachePair(failedCacheId, failedCache));
}
if (local)
{
/* now we create a map of time -> subid
* This is used to determine the newest and the oldest items
*/
#ifdef DISTRIB_DEBUG
std::cerr << "p3GroupDistrib::loadFileMsgs() Updating Local TimeStamps";
std::cerr << std::endl;
std::cerr << "p3GroupDistrib::loadFileMsgs() CacheSubId: " << cacheSubId << " recvd: " << ts;
std::cerr << std::endl;
#endif
mLocalCacheTs[ts] = cacheSubId;
if (cacheSubId > mMaxCacheSubId)
{
#ifdef DISTRIB_DEBUG
std::cerr << "p3GroupDistrib::loadFileMsgs() New Max CacheSubId";
std::cerr << std::endl;
#endif
mMaxCacheSubId = cacheSubId;
}
if (((time_t) ts < now) && ((time_t) ts > mLastPublishTime))
{
#ifdef DISTRIB_DEBUG
std::cerr << "p3GroupDistrib::loadFileMsgs() New LastPublishTime";
std::cerr << std::endl;
#endif
mLastPublishTime = ts;
}
}
delete store;
return;
}
/***************************************************************************************/
/***************************************************************************************/
/********************** load Cache Msgs ***************************************/
@ -2306,11 +2205,13 @@ void p3GroupDistrib::getPopularGroupList(uint32_t popMin, uint32_t popMax, std::
bool p3GroupDistrib::getAllMsgList(const std::string& grpId, std::list<std::string> &msgIds)
{
#ifdef ENABLE_CACHE_OPT
processHistoryCached(grpId);
#endif
RsStackMutex stack(distribMtx); /************* STACK MUTEX ************/
#ifdef ENABLE_CACHE_OPT
locked_processHistoryCached(grpId);
#endif
std::map<std::string, GroupInfo>::iterator git;
if (mGroups.end() == (git = mGroups.find(grpId)))
@ -2331,12 +2232,14 @@ bool p3GroupDistrib::getAllMsgList(const std::string& grpId, std::list<std::stri
bool p3GroupDistrib::getParentMsgList(const std::string& grpId, const std::string& pId,
std::list<std::string> &msgIds)
{
RsStackMutex stack(distribMtx); /************* STACK MUTEX ************/
#ifdef ENABLE_CACHE_OPT
locked_processHistoryCached(grpId);
processHistoryCached(grpId);
#endif
RsStackMutex stack(distribMtx); /************* STACK MUTEX ************/
std::map<std::string, GroupInfo>::iterator git;
if (mGroups.end() == (git = mGroups.find(grpId)))
{
@ -2397,7 +2300,7 @@ RsDistribMsg *p3GroupDistrib::locked_getGroupMsg(const std::string& grpId, const
/************* ALREADY LOCKED ************/
#ifdef ENABLE_CACHE_OPT
locked_processHistoryCached(grpId);
// processHistoryCached(grpId);
#endif
std::map<std::string, GroupInfo>::iterator git;
@ -5101,17 +5004,17 @@ bool p3GroupDistrib::locked_printDummyMsgs(GroupInfo &grp)
bool p3GroupDistrib::getDummyParentMsgList(const std::string& grpId, const std::string& pId, std::list<std::string> &msgIds)
{
// load grp from history cache if not already loaded
#ifdef ENABLE_CACHE_OPT
processHistoryCached(grpId);
#endif
#ifdef DISTRIB_DUMMYMSG_DEBUG
std::cerr << "p3GroupDistrib::getDummyParentMsgList(grpId:" << grpId << "," << pId << ")";
std::cerr << std::endl;
#endif
RsStackMutex stack(distribMtx); /************* STACK MUTEX ************/
// load grp from history cache if not already loaded
#ifdef ENABLE_CACHE_OPT
locked_processHistoryCached(grpId);
#endif
std::map<std::string, GroupInfo>::iterator git;
if (mGroups.end() == (git = mGroups.find(grpId)))
{
@ -5144,7 +5047,7 @@ RsDistribDummyMsg *p3GroupDistrib::locked_getGroupDummyMsg(const std::string& gr
{
#ifdef ENABLE_CACHE_OPT
locked_processHistoryCached(grpId);
// processHistoryCached(grpId);
#endif
#ifdef DISTRIB_DUMMYMSG_DEBUG

View file

@ -394,12 +394,6 @@ class p3GroupDistrib: public CacheSource, public CacheStore, public p3Config, pu
*/
bool locked_buildCacheTable(void);
/*!
* if grp's message is not loaded, load it, and update cache table
* @param grpId group whose messages to load if not cached
*/
void locked_processHistoryCached(const std::string& grpId);
/*!
* loads cache data which contains location of cache files belonging
@ -430,8 +424,18 @@ class p3GroupDistrib: public CacheSource, public CacheStore, public p3Config, pu
/* load cache files */
void loadFileGroups(const std::string &filename, const std::string &src, bool local, bool historical, const pCacheId& cid);
void loadFileMsgs(const std::string &filename, uint16_t cacheSubId, const std::string &src, uint32_t ts, bool local, bool historical);
void locked_loadFileMsgs(const std::string &filename, uint16_t cacheSubId, const std::string &src, uint32_t ts, bool local, bool historical);
/*!
* @param filename absolute cache file path
* @param cacheSubId cache subid, needed to save cache to history file
* @param src peer src id
* @param ts timestamp
* @param local set to whether it islocal or remote cache
* @param historical set to whether it is an old cache
* @param cacheLoad is a history cache opt load, prevent adding to cache history again
*/
void loadFileMsgs(const std::string &filename, uint16_t cacheSubId, const std::string &src, uint32_t ts,
bool local, bool historical, bool cacheLoad);
bool backUpKeys(const std::list<RsDistribGrpKey* > &keysToBackUp, std::string grpId);
void locked_sharePubKey();
@ -478,6 +482,11 @@ class p3GroupDistrib: public CacheSource, public CacheStore, public p3Config, pu
bool loadGroupKey(RsDistribGrpKey *newKey, bool historical);
/*!
* if grp's message is not loaded, load it, and update cache table
* @param grpId group whose messages to load if not cached
*/
void processHistoryCached(const std::string& grpId);
/***************************************************************************************/
/***************************************************************************************/

View file

@ -294,6 +294,7 @@ bool p3Forums::getForumThreadMsgList(const std::string &fId, const std::string &
bool p3Forums::getForumMessage(const std::string &fId, const std::string &mId, ForumMsgInfo &info)
{
processHistoryCached(fId);
RsStackMutex stack(distribMtx); /***** STACK LOCKED MUTEX *****/
RsDistribMsg *msg = locked_getGroupMsg(fId, mId);
@ -549,33 +550,42 @@ bool p3Forums::getMessageCount(const std::string &fId, unsigned int &newCount, u
} /******* UNLOCKED ********/
if (grpFlags & (RS_DISTRIB_ADMIN | RS_DISTRIB_SUBSCRIBED)) {
std::list<std::string> msgIds;
if (getAllMsgList(fId, msgIds)) { // get msg ids without causing a costly cache load
RsStackMutex stack(distribMtx); /***** STACK LOCKED MUTEX *****/
RsStackMutex stack(distribMtx); /***** STACK LOCKED MUTEX *****/
std::map<std::string, RsForumReadStatus*>::iterator fit = mReadStatus.find(fId);
if (fit == mReadStatus.end()) {
// not status available
continue;
}
std::map<std::string, RsForumReadStatus*>::iterator fit = mReadStatus.find(fId);
if (fit == mReadStatus.end()) {
// no status available -> all messages are new
newCount += msgIds.size();
unreadCount += msgIds.size();
continue;
}
// iterator through read status map to determine number of new and old
std::map<std::string, uint32_t >::iterator rit = fit->second->msgReadStatus.begin();
for(; rit != fit->second->msgReadStatus.end(); rit++)
{
if(rit->second & FORUM_MSG_STATUS_READ)
{
if (rit->second & FORUM_MSG_STATUS_UNREAD_BY_USER) {
// message is unread
std::list<std::string>::iterator mit;
for (mit = msgIds.begin(); mit != msgIds.end(); mit++) {
std::map<std::string, uint32_t >::iterator rit = fit->second->msgReadStatus.find(*mit);
if (rit == fit->second->msgReadStatus.end()) {
// no status available -> message is new
newCount++;
unreadCount++;
continue;
}
if (rit->second & FORUM_MSG_STATUS_READ) {
// message is not new
if (rit->second & FORUM_MSG_STATUS_UNREAD_BY_USER) {
// message is unread
unreadCount++;
}
} else {
newCount++;
unreadCount++;
}
}
else
{
newCount++;
unreadCount++;
}
}
/******* UNLOCKED ********/
} /******* UNLOCKED ********/
}
}