diff --git a/libretroshare/src/services/p3distrib.cc b/libretroshare/src/services/p3distrib.cc index b5f497eb0..88046f58b 100644 --- a/libretroshare/src/services/p3distrib.cc +++ b/libretroshare/src/services/p3distrib.cc @@ -204,7 +204,7 @@ int p3GroupDistrib::tick() bool updateCacheDoc = false; { RsStackMutex stack(distribMtx); - updateCacheDoc = (now > (time_t) (mLastCacheDocUpdate + 30)); + updateCacheDoc = (now > (time_t) (mLastCacheDocUpdate + 10)); updateCacheDoc &= !mHistoricalCaches && mUpdateCacheDoc && mHistoricalCachesLoaded; #ifdef DISTRIB_HISTORY_DEBUG std::cerr << "num pending grps: " << mGrpHistPending.size() << std::endl; @@ -647,12 +647,9 @@ bool p3GroupDistrib::locked_buildCacheTable(){ return true; } -void p3GroupDistrib::locked_processHistoryCached(const std::string& grpId) +void p3GroupDistrib::processHistoryCached(const std::string& grpId) { - // no processing should be done until cache locations have been stored in memory - if(mHistoricalCaches) - return; #ifdef DISTRIB_HISTORY_DEBUG std::cerr << "p3GroupDistrib::locked_processHistoryCached() " @@ -660,7 +657,15 @@ void p3GroupDistrib::locked_processHistoryCached(const std::string& grpId) #endif bool cached = true; - locked_historyCached(grpId, cached); + { + RsStackMutex stack(distribMtx); + + // no processing should be done until cache locations have been stored in memory + if(mHistoricalCaches) + return; + + locked_historyCached(grpId, cached); + } std::list cDataList; std::list::iterator cit; @@ -671,14 +676,24 @@ void p3GroupDistrib::locked_processHistoryCached(const std::string& grpId) // if not history cached then load it if(!cached) { + + // get list of cache id belonging to grp - locked_getHistoryCacheData(grpId, cDataList); - cit = cDataList.begin(); + { + RsStackMutex stack(distribMtx); + locked_getHistoryCacheData(grpId, cDataList); - for(; cit != cDataList.end(); cit++){ + cit = cDataList.begin(); + + for(; cit != cDataList.end(); cit++){ + cit->cid.type = cacheType; + locked_getStoredCache(*cit); + } + } + + // now load + for(cit = cDataList.begin(); cit != cDataList.end(); cit++){ - cit->cid.type = cacheType; - locked_getStoredCache(*cit); file = cit->path; file += "/"; file += cit->name; @@ -686,13 +701,16 @@ void p3GroupDistrib::locked_processHistoryCached(const std::string& grpId) // note: you could load msgs for a cache historied group that is not loaded, // but any request for info of affected grp will consequently load // all its msgs through this function anyways - locked_loadFileMsgs(file, cit->cid.subid, cit->pid, cit->recvd, (cit->pid == mOwnId), true); + loadFileMsgs(file, cit->cid.subid, cit->pid, cit->recvd, (cit->pid == mOwnId), true, true); } - } - locked_updateCacheTableEntry(grpId, true); + { + RsStackMutex stack(distribMtx); + locked_updateCacheTableEntry(grpId, true); + } + return; } @@ -992,7 +1010,7 @@ int p3GroupDistrib::loadAnyCache(const CacheData &data, bool local, bool his } else { - loadFileMsgs(file, data.cid.subid, data.pid, data.recvd, local, historical); + loadFileMsgs(file, data.cid.subid, data.pid, data.recvd, local, historical, false); } return true; @@ -1083,7 +1101,8 @@ void p3GroupDistrib::loadFileGroups(const std::string &filename, const std::stri return; } -void p3GroupDistrib::loadFileMsgs(const std::string &filename, uint16_t cacheSubId, const std::string &src, uint32_t ts, bool local, bool historical) +void p3GroupDistrib::loadFileMsgs(const std::string &filename, uint16_t cacheSubId, const std::string &src, uint32_t ts, bool local, bool historical, + bool cacheLoad) { #ifdef DISTRIB_DEBUG @@ -1098,14 +1117,21 @@ void p3GroupDistrib::loadFileMsgs(const std::string &filename, uint16_t cacheSub // if cache id exists in cache table exit { RsStackMutex stack(distribMtx); + // if this is a cache load proceed if not check + // cache id exists in cache table, if so don't load - if(locked_historyCached(pCacheId(src, cacheSubId))){ - return; - } - else + if(!cacheLoad) { - cache = true; + if(historical && locked_historyCached(pCacheId(src, cacheSubId))) + { + return; + } + else + { + cache = true; + } } + } #endif @@ -1216,133 +1242,6 @@ void p3GroupDistrib::loadFileMsgs(const std::string &filename, uint16_t cacheSub return; } -//TODO make carbon copy of sister -void p3GroupDistrib::locked_loadFileMsgs(const std::string &filename, uint16_t cacheSubId, const std::string &src, uint32_t ts, bool local, bool historical) -{ - -#ifdef DISTRIB_DEBUG - std::cerr << "p3GroupDistrib::loadFileMsgs()"; - std::cerr << std::endl; -#endif - - time_t now = time(NULL); - bool cache = false; - -#ifdef ENABLE_CACHE_OPT - // if cache id exists in cache table exit - if(!historical){ - if(locked_historyCached(pCacheId(src, cacheSubId))){ - return; - } - else - { - cache = true; - } - } -#endif - - // link grp to cache id (only one cache id, so doesn't matter if one grp comes out twice - // with same cache id) - std::map msgCacheMap; - pCacheId failedCache = pCacheId(src, cacheSubId); - /* create the serialiser to load msgs */ - BinInterface *bio = new BinFileInterface(filename.c_str(), BIN_FLAGS_READABLE); - pqistore *store = createStore(bio, src, BIN_FLAGS_READABLE); - -#ifdef DISTRIB_DEBUG - std::cerr << "loading file " << filename << std::endl ; -#endif - - RsItem *item; - RsDistribSignedMsg *newMsg; - std::string grpId; - - while(NULL != (item = store->GetItem())) - { -#ifdef DISTRIB_DEBUG - std::cerr << "p3GroupDistrib::loadFileMsgs() Got Item:"; - std::cerr << std::endl; - item->print(std::cerr, 10); - std::cerr << std::endl; -#endif - - if ((newMsg = dynamic_cast(item))) - { - grpId = newMsg->grpId; - if(locked_loadMsg(newMsg, src, local, historical)) - { - if(cache) - { - msgCacheMap.insert(grpCachePair(grpId, pCacheId(src, cacheSubId))); - } - } - } - else - { -#ifdef DISTRIB_DEBUG - std::cerr << "p3GroupDistrib::loadFileMsgs() Unexpected Item - deleting"; - std::cerr << std::endl; -#endif - /* wrong message type */ - delete item; - } - } - - std::map::iterator mit; - - if(cache){ - - mit = msgCacheMap.begin(); - for(;mit != msgCacheMap.end(); mit++) - { - mMsgHistPending.push_back(grpCachePair(mit->first, mit->second)); - } - mUpdateCacheDoc = true; - if(!msgCacheMap.empty()) - mCount++; - - std::string failedCacheId = FAILED_CACHE_CONT; - - // if msg cache map is empty then cache id failed - if(msgCacheMap.empty()) - mMsgHistPending.push_back(grpCachePair(failedCacheId, failedCache)); - } - - if (local) - { - /* now we create a map of time -> subid - * This is used to determine the newest and the oldest items - */ -#ifdef DISTRIB_DEBUG - std::cerr << "p3GroupDistrib::loadFileMsgs() Updating Local TimeStamps"; - std::cerr << std::endl; - std::cerr << "p3GroupDistrib::loadFileMsgs() CacheSubId: " << cacheSubId << " recvd: " << ts; - std::cerr << std::endl; -#endif - - mLocalCacheTs[ts] = cacheSubId; - if (cacheSubId > mMaxCacheSubId) - { -#ifdef DISTRIB_DEBUG - std::cerr << "p3GroupDistrib::loadFileMsgs() New Max CacheSubId"; - std::cerr << std::endl; -#endif - mMaxCacheSubId = cacheSubId; - } - - if (((time_t) ts < now) && ((time_t) ts > mLastPublishTime)) - { -#ifdef DISTRIB_DEBUG - std::cerr << "p3GroupDistrib::loadFileMsgs() New LastPublishTime"; - std::cerr << std::endl; -#endif - mLastPublishTime = ts; - } - } - - delete store; - return; -} /***************************************************************************************/ /***************************************************************************************/ /********************** load Cache Msgs ***************************************/ @@ -2306,11 +2205,13 @@ void p3GroupDistrib::getPopularGroupList(uint32_t popMin, uint32_t popMax, std:: bool p3GroupDistrib::getAllMsgList(const std::string& grpId, std::list &msgIds) { +#ifdef ENABLE_CACHE_OPT + processHistoryCached(grpId); +#endif + RsStackMutex stack(distribMtx); /************* STACK MUTEX ************/ -#ifdef ENABLE_CACHE_OPT - locked_processHistoryCached(grpId); -#endif + std::map::iterator git; if (mGroups.end() == (git = mGroups.find(grpId))) @@ -2331,12 +2232,14 @@ bool p3GroupDistrib::getAllMsgList(const std::string& grpId, std::list &msgIds) { - RsStackMutex stack(distribMtx); /************* STACK MUTEX ************/ #ifdef ENABLE_CACHE_OPT - locked_processHistoryCached(grpId); + processHistoryCached(grpId); #endif + RsStackMutex stack(distribMtx); /************* STACK MUTEX ************/ + + std::map::iterator git; if (mGroups.end() == (git = mGroups.find(grpId))) { @@ -2397,7 +2300,7 @@ RsDistribMsg *p3GroupDistrib::locked_getGroupMsg(const std::string& grpId, const /************* ALREADY LOCKED ************/ #ifdef ENABLE_CACHE_OPT - locked_processHistoryCached(grpId); +// processHistoryCached(grpId); #endif std::map::iterator git; @@ -5101,17 +5004,17 @@ bool p3GroupDistrib::locked_printDummyMsgs(GroupInfo &grp) bool p3GroupDistrib::getDummyParentMsgList(const std::string& grpId, const std::string& pId, std::list &msgIds) { + // load grp from history cache if not already loaded +#ifdef ENABLE_CACHE_OPT + processHistoryCached(grpId); +#endif + #ifdef DISTRIB_DUMMYMSG_DEBUG std::cerr << "p3GroupDistrib::getDummyParentMsgList(grpId:" << grpId << "," << pId << ")"; std::cerr << std::endl; #endif RsStackMutex stack(distribMtx); /************* STACK MUTEX ************/ - // load grp from history cache if not already loaded -#ifdef ENABLE_CACHE_OPT - locked_processHistoryCached(grpId); -#endif - std::map::iterator git; if (mGroups.end() == (git = mGroups.find(grpId))) { @@ -5144,7 +5047,7 @@ RsDistribDummyMsg *p3GroupDistrib::locked_getGroupDummyMsg(const std::string& gr { #ifdef ENABLE_CACHE_OPT - locked_processHistoryCached(grpId); +// processHistoryCached(grpId); #endif #ifdef DISTRIB_DUMMYMSG_DEBUG diff --git a/libretroshare/src/services/p3distrib.h b/libretroshare/src/services/p3distrib.h index 738fd1c74..cdc84808f 100644 --- a/libretroshare/src/services/p3distrib.h +++ b/libretroshare/src/services/p3distrib.h @@ -394,12 +394,6 @@ class p3GroupDistrib: public CacheSource, public CacheStore, public p3Config, pu */ bool locked_buildCacheTable(void); - /*! - * if grp's message is not loaded, load it, and update cache table - * @param grpId group whose messages to load if not cached - */ - void locked_processHistoryCached(const std::string& grpId); - /*! * loads cache data which contains location of cache files belonging @@ -430,8 +424,18 @@ class p3GroupDistrib: public CacheSource, public CacheStore, public p3Config, pu /* load cache files */ void loadFileGroups(const std::string &filename, const std::string &src, bool local, bool historical, const pCacheId& cid); - void loadFileMsgs(const std::string &filename, uint16_t cacheSubId, const std::string &src, uint32_t ts, bool local, bool historical); - void locked_loadFileMsgs(const std::string &filename, uint16_t cacheSubId, const std::string &src, uint32_t ts, bool local, bool historical); + + /*! + * @param filename absolute cache file path + * @param cacheSubId cache subid, needed to save cache to history file + * @param src peer src id + * @param ts timestamp + * @param local set to whether it islocal or remote cache + * @param historical set to whether it is an old cache + * @param cacheLoad is a history cache opt load, prevent adding to cache history again + */ + void loadFileMsgs(const std::string &filename, uint16_t cacheSubId, const std::string &src, uint32_t ts, + bool local, bool historical, bool cacheLoad); bool backUpKeys(const std::list &keysToBackUp, std::string grpId); void locked_sharePubKey(); @@ -478,6 +482,11 @@ class p3GroupDistrib: public CacheSource, public CacheStore, public p3Config, pu bool loadGroupKey(RsDistribGrpKey *newKey, bool historical); + /*! + * if grp's message is not loaded, load it, and update cache table + * @param grpId group whose messages to load if not cached + */ + void processHistoryCached(const std::string& grpId); /***************************************************************************************/ /***************************************************************************************/ diff --git a/libretroshare/src/services/p3forums.cc b/libretroshare/src/services/p3forums.cc index 2687ec866..fecc59859 100644 --- a/libretroshare/src/services/p3forums.cc +++ b/libretroshare/src/services/p3forums.cc @@ -294,6 +294,7 @@ bool p3Forums::getForumThreadMsgList(const std::string &fId, const std::string & bool p3Forums::getForumMessage(const std::string &fId, const std::string &mId, ForumMsgInfo &info) { + processHistoryCached(fId); RsStackMutex stack(distribMtx); /***** STACK LOCKED MUTEX *****/ RsDistribMsg *msg = locked_getGroupMsg(fId, mId); @@ -549,33 +550,42 @@ bool p3Forums::getMessageCount(const std::string &fId, unsigned int &newCount, u } /******* UNLOCKED ********/ if (grpFlags & (RS_DISTRIB_ADMIN | RS_DISTRIB_SUBSCRIBED)) { + std::list msgIds; + if (getAllMsgList(fId, msgIds)) { // get msg ids without causing a costly cache load - RsStackMutex stack(distribMtx); /***** STACK LOCKED MUTEX *****/ + RsStackMutex stack(distribMtx); /***** STACK LOCKED MUTEX *****/ - std::map::iterator fit = mReadStatus.find(fId); - if (fit == mReadStatus.end()) { - // not status available - continue; - } + std::map::iterator fit = mReadStatus.find(fId); + if (fit == mReadStatus.end()) { + // no status available -> all messages are new + newCount += msgIds.size(); + unreadCount += msgIds.size(); + continue; + } - // iterator through read status map to determine number of new and old - std::map::iterator rit = fit->second->msgReadStatus.begin(); - for(; rit != fit->second->msgReadStatus.end(); rit++) - { - if(rit->second & FORUM_MSG_STATUS_READ) - { - if (rit->second & FORUM_MSG_STATUS_UNREAD_BY_USER) { - // message is unread + std::list::iterator mit; + for (mit = msgIds.begin(); mit != msgIds.end(); mit++) { + std::map::iterator rit = fit->second->msgReadStatus.find(*mit); + + if (rit == fit->second->msgReadStatus.end()) { + // no status available -> message is new + newCount++; + unreadCount++; + continue; + } + + if (rit->second & FORUM_MSG_STATUS_READ) { + // message is not new + if (rit->second & FORUM_MSG_STATUS_UNREAD_BY_USER) { + // message is unread + unreadCount++; + } + } else { + newCount++; unreadCount++; } } - else - { - newCount++; - unreadCount++; - } - } - /******* UNLOCKED ********/ + } /******* UNLOCKED ********/ } }