From 2f315cc0fccffa7305b278caa587ce4b4de51839 Mon Sep 17 00:00:00 2001 From: chrisparker126 Date: Sun, 24 Jul 2011 18:00:50 +0000 Subject: [PATCH] msg history space optimisation branch git-svn-id: http://svn.code.sf.net/p/retroshare/code/branches/v0.5-msghistory@4487 b45a01b8-16f6-495d-af2f-9b41ad6348cc --- libretroshare/src/libretroshare.pro | 5 +- libretroshare/src/pqi/p3cfgmgr.cc | 1 + libretroshare/src/pqi/pqistore.h | 16 + libretroshare/src/retroshare/rsforums.h | 12 + .../src/serialiser/rsdistribitems.cc | 142 +++++ libretroshare/src/serialiser/rsdistribitems.h | 29 + libretroshare/src/serialiser/rsforumitems.h | 2 +- libretroshare/src/services/p3blogs.cc | 6 +- libretroshare/src/services/p3channels.cc | 4 +- libretroshare/src/services/p3distrib.cc | 555 +++++++++++------- libretroshare/src/services/p3distrib.h | 109 +++- libretroshare/src/services/p3forums.cc | 11 +- retroshare-gui/src/RetroShare.pro | 2 +- retroshare-gui/src/gui/ForumsDialog.cpp | 10 +- retroshare-gui/src/gui/ForumsDialog.h | 1 + 15 files changed, 681 insertions(+), 224 deletions(-) diff --git a/libretroshare/src/libretroshare.pro b/libretroshare/src/libretroshare.pro index a5e44b380..4de31185f 100644 --- a/libretroshare/src/libretroshare.pro +++ b/libretroshare/src/libretroshare.pro @@ -1,14 +1,13 @@ TEMPLATE = lib #CONFIG += staticlib release #CONFIG += staticlib testnetwork -CONFIG += staticlib bitdht +CONFIG += staticlib bitdht debug CONFIG -= qt TARGET = retroshare # Beware: All data of the stripped services are lost #CONFIG += minimal -DEFINES *= PQI_DISABLE_TUNNEL -#ENABLE_CACHE_OPT +DEFINES *= PQI_DISABLE_TUNNEL ENABLE_CACHE_OPT minimal { CONFIG -= use_blogs diff --git a/libretroshare/src/pqi/p3cfgmgr.cc b/libretroshare/src/pqi/p3cfgmgr.cc index 255c6e86d..5dad9ec24 100644 --- a/libretroshare/src/pqi/p3cfgmgr.cc +++ b/libretroshare/src/pqi/p3cfgmgr.cc @@ -740,6 +740,7 @@ bool p3Config::loadAttempt(const std::string& cfgFname,const std::string& signFn uint32_t bioflags = BIN_FLAGS_HASH_DATA | BIN_FLAGS_READABLE; uint32_t stream_flags = BIN_FLAGS_READABLE; + // bio is cleaned up after stream goes out of scope BinEncryptedFileInterface *bio = new BinEncryptedFileInterface(cfgFname.c_str(), bioflags); pqiSSLstore stream(setupSerialiser(), "CONFIG", bio, stream_flags); diff --git a/libretroshare/src/pqi/pqistore.h b/libretroshare/src/pqi/pqistore.h index 0e1bc400c..4f1596a7a 100644 --- a/libretroshare/src/pqi/pqistore.h +++ b/libretroshare/src/pqi/pqistore.h @@ -41,7 +41,16 @@ class pqistore: public PQInterface { public: + + /*! + * bio passed must be valid throughout lifetime of a pqistore instance + * @param rss + * @param srcId + * @param bio_in pqistore deletes bio once constructor called + * @param bio_flagsin + */ pqistore(RsSerialiser *rss, const std::string &srcId, BinInterface *bio_in, int bio_flagsin); + virtual ~pqistore(); // PQInterface @@ -84,6 +93,13 @@ class pqiSSLstore: public pqistore public: + /*! + * + * @param rss + * @param srcId + * @param bio_in deleted once pqiSSLstore call its destructor + * @param bio_flagsin + */ pqiSSLstore(RsSerialiser *rss, std::string srcId, BinEncryptedFileInterface *bio_in, int bio_flagsin); virtual ~pqiSSLstore(); diff --git a/libretroshare/src/retroshare/rsforums.h b/libretroshare/src/retroshare/rsforums.h index 634dcc267..e1d4b62a1 100644 --- a/libretroshare/src/retroshare/rsforums.h +++ b/libretroshare/src/retroshare/rsforums.h @@ -115,6 +115,11 @@ std::ostream &operator<<(std::ostream &out, const ForumMsgInfo &info); class RsForums; extern RsForums *rsForums; +/*! + * @brief interface to rs forums, a distributed cache based service + * @note avoid requesting available messages until requested by the user, requesting any information + * on messages for a grpId is expensive memory wise. + */ class RsForums { public: @@ -148,6 +153,13 @@ virtual bool ForumMessageSend(ForumMsgInfo &info) = 0; virtual bool forumRestoreKeys(const std::string& fId) = 0; virtual bool forumSubscribe(const std::string &fId, bool subscribe) = 0; +/*! + * + * @param fId forumId to retrieve msg count for, leave as blank to get msg count for all subscribed forums + * @param newCount number new msgs + * @param unreadCount number of unread msgs + * @return true if successful, false otherwise + */ virtual bool getMessageCount(const std::string &fId, unsigned int &newCount, unsigned int &unreadCount) = 0; /****************************************/ diff --git a/libretroshare/src/serialiser/rsdistribitems.cc b/libretroshare/src/serialiser/rsdistribitems.cc index 5a4396c5d..33506e950 100644 --- a/libretroshare/src/serialiser/rsdistribitems.cc +++ b/libretroshare/src/serialiser/rsdistribitems.cc @@ -858,6 +858,134 @@ RsDistribConfigData *RsDistribSerialiser::deserialiseConfigData(void *data, uint } +uint32_t RsDistribSerialiser::sizeMsgHstry(RsDistribMsgHstry *item) +{ + uint32_t s = 8; /* header */ + + /* RsDistribSignedMsg stuff */ + s += GetTlvStringSize(item->grpId); + s += GetTlvStringSize(item->msgHstryFileHash); + s += GetTlvStringSize(item->msgHstryFilePath); + + return s; +} + +/* serialise the data to the buffer */ +bool RsDistribSerialiser::serialiseMsgHstry(RsDistribMsgHstry *item, void *data, uint32_t *pktsize) +{ +#ifdef RSSERIAL_DEBUG + std::cerr << "RsDistribSerialiser::serialiseMsgHstry()" << std::endl; +#endif + uint32_t tlvsize = sizeMsgHstry(item); + uint32_t offset = 0; + + if (*pktsize < tlvsize) + { +#ifdef RSSERIAL_DEBUG + std::cerr << "RsDistribSerialiser::serialiseMsgHstry() FAIL no space" << std::endl; +#endif + return false; /* not enough space */ + } + + *pktsize = tlvsize; + + bool ok = true; + + ok &= setRsItemHeader(data, tlvsize, item->PacketId(), tlvsize); + + /* skip the header */ + offset += 8; + + /* grpId */ + ok &= SetTlvString(data, tlvsize, &offset, TLV_TYPE_STR_GROUPID, item->grpId); + ok &= SetTlvString(data, tlvsize, &offset, TLV_TYPE_STR_HASH_SHA1, item->msgHstryFileHash); + ok &= SetTlvString(data, tlvsize, &offset, TLV_TYPE_STR_PATH, item->msgHstryFilePath); + + if (offset != tlvsize) + { +#ifdef RSSERIAL_DEBUG + std::cerr << "RsDistribSerialiser::serialiseMsgHstry() FAIL Size Error! " << std::endl; +#endif + ok = false; + } + +#ifdef RSSERIAL_DEBUG + if (!ok) + { + std::cerr << "RsDistribSerialiser::serialiseMsgHstry() NOK" << std::endl; + } +#endif + + return ok; +} + + +RsDistribMsgHstry *RsDistribSerialiser::deserialiseMsgHstry(void *data, uint32_t *pktsize) +{ +#ifdef RSSERIAL_DEBUG + std::cerr << "RsDistribSerialiser::deserialiseMsgHstry()" << std::endl; +#endif + /* get the type and size */ + uint32_t rstype = getRsItemId(data); + uint32_t rssize = getRsItemSize(data); + + uint32_t offset = 0; + + + if ((RS_PKT_VERSION_SERVICE != getRsItemVersion(rstype)) || + (SERVICE_TYPE != getRsItemService(rstype)) || + (RS_PKT_SUBTYPE_DISTRIB_MSG_HSTRY != getRsItemSubType(rstype))) + { +#ifdef RSSERIAL_DEBUG + std::cerr << "RsDistribSerialiser::deserialiseMsgHstry() Wrong Type" << std::endl; +#endif + return NULL; /* wrong type */ + } + + if (*pktsize < rssize) /* check size */ + { +#ifdef RSSERIAL_DEBUG + std::cerr << "RsDistribSerialiser::deserialiseMsgHstry() Wrong Size" << std::endl; +#endif + return NULL; /* not enough data */ + } + + /* set the packet length */ + *pktsize = rssize; + + bool ok = true; + + RsDistribMsgHstry* item = new RsDistribMsgHstry(); + /* skip the header */ + offset += 8; + + ok &= GetTlvString(data, rssize, &offset, TLV_TYPE_STR_GROUPID, item->grpId); + ok &= GetTlvString(data, rssize, &offset, TLV_TYPE_STR_HASH_SHA1, item->msgHstryFileHash); + ok &= GetTlvString(data, rssize, &offset, TLV_TYPE_STR_PATH, item->msgHstryFilePath); + + + if (offset != rssize) + { +#ifdef RSSERIAL_DEBUG + std::cerr << "RsDistribSerialiser::deserialiseMsgHstry() size mismatch" << std::endl; +#endif + /* error */ + delete item; + return NULL; + } + + if (!ok) + { +#ifdef RSSERIAL_DEBUG + std::cerr << "RsDistribSerialiser::deserialiseMsgHstry() NOK" << std::endl; +#endif + delete item; + return NULL; + } + + return item; +} + @@ -867,6 +995,8 @@ uint32_t RsDistribSerialiser::size(RsItem *i) RsDistribGrpKey *dgk; RsDistribSignedMsg *dsm; RsDistribConfigData *dsd; + RsDistribMsgHstry *dmh; + /* in order of frequency */ if (NULL != (dsm = dynamic_cast(i))) @@ -885,6 +1015,10 @@ uint32_t RsDistribSerialiser::size(RsItem *i) { return sizeConfigData(dsd); } + else if(NULL != (dmh = dynamic_cast(i))) + { + return sizeMsgHstry(dmh); + } return 0; } @@ -898,6 +1032,7 @@ bool RsDistribSerialiser::serialise(RsItem *i, void *data, uint32_t *pktsize RsDistribGrpKey *dgk; RsDistribSignedMsg *dsm; RsDistribConfigData *dsd; + RsDistribMsgHstry *dmh; if (NULL != (dsm = dynamic_cast(i))) { @@ -915,6 +1050,10 @@ bool RsDistribSerialiser::serialise(RsItem *i, void *data, uint32_t *pktsize { return serialiseConfigData(dsd, data, pktsize); } + else if(NULL != (dsd = dynamic_cast(i))) + { + return serialiseMsgHstry(dmh, data, pktsize); + } return false; } @@ -946,6 +1085,9 @@ RsItem *RsDistribSerialiser::deserialise(void *data, uint32_t *pktsize) case RS_PKT_SUBTYPE_DISTRIB_CONFIG_DATA: return deserialiseConfigData(data, pktsize); break; + case RS_PKT_SUBTYPE_DISTRIB_MSG_HSTRY: + return deserialiseMsgHstry(data, pktsize); + break; default: return NULL; break; diff --git a/libretroshare/src/serialiser/rsdistribitems.h b/libretroshare/src/serialiser/rsdistribitems.h index 33681de10..619fd62dc 100644 --- a/libretroshare/src/serialiser/rsdistribitems.h +++ b/libretroshare/src/serialiser/rsdistribitems.h @@ -38,6 +38,7 @@ const uint8_t RS_PKT_SUBTYPE_DISTRIB_GRP = 0x01; const uint8_t RS_PKT_SUBTYPE_DISTRIB_GRP_KEY = 0x02; const uint8_t RS_PKT_SUBTYPE_DISTRIB_SIGNED_MSG = 0x03; const uint8_t RS_PKT_SUBTYPE_DISTRIB_CONFIG_DATA = 0x04; +const uint8_t RS_PKT_SUBTYPE_DISTRIB_MSG_HSTRY = 0x10; /**************************************************************************/ @@ -209,6 +210,30 @@ virtual std::ostream& print(std::ostream &out, uint16_t indent = 0); RsTlvSecurityKey key; }; +/*! + * for storing file to archived msgs of a group + */ +class RsDistribMsgHstry: public RsItem +{ + public: + + RsDistribMsgHstry(uint16_t service_type) + :RsItem(RS_PKT_VERSION_SERVICE, service_type, RS_PKT_SUBTYPE_DISTRIB_MSG_HSTRY) + { return; } + + RsDistribMsgHstry() + :RsItem(RS_PKT_VERSION_SERVICE, RS_SERVICE_TYPE_DISTRIB, RS_PKT_SUBTYPE_DISTRIB_MSG_HSTRY) + { return; } + +virtual ~RsDistribMsgHstry() { return; } + +virtual void clear(); +virtual std::ostream& print(std::ostream &out, uint16_t indent = 0); + + std::string grpId; /* Grp Id */ + std::string msgHstryFilePath; // path to where archived msgs are stored + std::string msgHstryFileHash; // hash of file at file path +}; class RsDistribSerialiser: public RsSerialType { @@ -248,6 +273,10 @@ virtual uint32_t sizeConfigData(RsDistribConfigData *); virtual bool serialiseConfigData(RsDistribConfigData *item, void *data, uint32_t *size); virtual RsDistribConfigData *deserialiseConfigData(void* data, uint32_t *size); +/* For RS_PKT_SUBTYPE_DISTRIB_MSG_HSTRY */ +virtual uint32_t sizeMsgHstry(RsDistribMsgHstry *); +virtual bool serialiseMsgHstry(RsDistribMsgHstry *item, void *data, uint32_t *size); +virtual RsDistribMsgHstry*deserialiseMsgHstry(void* data, uint32_t *size); const uint16_t SERVICE_TYPE; diff --git a/libretroshare/src/serialiser/rsforumitems.h b/libretroshare/src/serialiser/rsforumitems.h index f83148f78..639246aef 100644 --- a/libretroshare/src/serialiser/rsforumitems.h +++ b/libretroshare/src/serialiser/rsforumitems.h @@ -81,7 +81,7 @@ public: std::string forumId; - /// a map which contains the read for messages within a forum + /// a map (msgId, status bit-field) which contains the read status for messages within a forum std::map msgReadStatus; }; diff --git a/libretroshare/src/services/p3blogs.cc b/libretroshare/src/services/p3blogs.cc index c865282f2..51cb06f3b 100644 --- a/libretroshare/src/services/p3blogs.cc +++ b/libretroshare/src/services/p3blogs.cc @@ -68,14 +68,16 @@ RsBlogs *rsBlogs = NULL; /* Blogs will be initially stored for 1 year * remember 2^16 = 64K max units in store period. * PUBPERIOD * 2^16 = max STORE PERIOD */ -#define BLOG_STOREPERIOD (90*24*3600) /* 30 * 24 * 3600 - secs in a year */ +#define BLOG_STOREPERIOD (60*24*3600) /* 30 * 24 * 3600 - secs in a year */ #define BLOG_PUBPERIOD 600 /* 10 minutes ... (max = 455 days) */ +#define BLOG_ARCHIVE_PERIOD (180*24*3600) /* 180 * 24 * 3600 - secs in a year */ p3Blogs::p3Blogs(uint16_t type, CacheStrapper *cs, CacheTransfer *cft, std::string srcdir, std::string storedir) :p3GroupDistrib(type, cs, cft, srcdir, storedir, "", - CONFIG_TYPE_QBLOG, BLOG_STOREPERIOD, BLOG_PUBPERIOD) + CONFIG_TYPE_QBLOG, BLOG_STOREPERIOD, BLOG_ARCHIVE_PERIOD, + BLOG_PUBPERIOD) { return; } diff --git a/libretroshare/src/services/p3channels.cc b/libretroshare/src/services/p3channels.cc index 3ec1e4d6e..7a096c06e 100644 --- a/libretroshare/src/services/p3channels.cc +++ b/libretroshare/src/services/p3channels.cc @@ -66,13 +66,15 @@ RsChannels *rsChannels = NULL; * PUBPERIOD * 2^16 = max STORE PERIOD */ #define CHANNEL_STOREPERIOD (30*24*3600) /* 30 * 24 * 3600 - secs in a 30 day month */ #define CHANNEL_PUBPERIOD 120 /* 2 minutes ... (max = 455 days) */ +#define CHANNEL_ARCHIVE_PERIOD (180*24*3600) #define MAX_AUTO_DL 1E9 /* auto download of attachment limit; 1 GIG */ p3Channels::p3Channels(uint16_t type, CacheStrapper *cs, CacheTransfer *cft, RsFiles *files, std::string srcdir, std::string storedir, std::string chanDir) :p3GroupDistrib(type, cs, cft, srcdir, storedir, chanDir, - CONFIG_TYPE_CHANNELS, CHANNEL_STOREPERIOD, CHANNEL_PUBPERIOD), + CONFIG_TYPE_CHANNELS, CHANNEL_STOREPERIOD, CHANNEL_ARCHIVE_PERIOD, + CHANNEL_PUBPERIOD), mRsFiles(files), mChannelsDir(chanDir) { diff --git a/libretroshare/src/services/p3distrib.cc b/libretroshare/src/services/p3distrib.cc index 16877ecf1..09e22563f 100644 --- a/libretroshare/src/services/p3distrib.cc +++ b/libretroshare/src/services/p3distrib.cc @@ -95,17 +95,22 @@ GroupInfo::~GroupInfo() delete it->second ; } +RsDistribMsgArchive::RsDistribMsgArchive(){ + loaded = false; +} + p3GroupDistrib::p3GroupDistrib(uint16_t subtype, CacheStrapper *cs, CacheTransfer *cft, std::string sourcedir, std::string storedir, std::string keyBackUpDir, uint32_t configId, - uint32_t storePeriod, uint32_t pubPeriod) + uint32_t storePeriod, uint32_t archivePeriod, uint32_t pubPeriod) :CacheSource(subtype, true, cs, sourcedir), CacheStore(subtype, true, cs, cft, storedir), p3Config(configId), p3ThreadedService(subtype), mHistoricalCaches(true), - mStorePeriod(storePeriod), + mStorePeriod(storePeriod), + mArchivePeriod(archivePeriod), mPubPeriod(pubPeriod), mLastPublishTime(0), mMaxCacheSubId(1), @@ -199,12 +204,12 @@ int p3GroupDistrib::tick() receivePubKeys(); } - // update cache document every 1 minute (5 mins in production) + // update cache document every 1 minute (should be 5 mins in production) // after historical files have loaded and there is reason to bool updateCacheDoc = false; { RsStackMutex stack(distribMtx); - updateCacheDoc = (now > (time_t) (mLastCacheDocUpdate + 30)); + updateCacheDoc = (now > (time_t) (mLastCacheDocUpdate + 10)); updateCacheDoc &= !mHistoricalCaches && mUpdateCacheDoc && mHistoricalCachesLoaded; #ifdef DISTRIB_HISTORY_DEBUG std::cerr << "num pending grps: " << mGrpHistPending.size() << std::endl; @@ -213,13 +218,11 @@ int p3GroupDistrib::tick() #endif } -#ifdef ENABLE_CACHE_OPT if(updateCacheDoc){ std::cerr << "count: " << mCount << std::endl; updateCacheDocument(); } -#endif return 0; } @@ -316,7 +319,7 @@ void p3GroupDistrib::updateCacheDocument() std::vector grpNodes; std::string failedCacheId = FAILED_CACHE_CONT; - // failed cache content node is has not been created add to doc + // failed cache content node has not been created, so add to doc if(mCacheTable.find(failedCacheId) == mCacheTable.end()){ mCacheDoc.append_child("group"); @@ -384,7 +387,7 @@ void p3GroupDistrib::updateCacheDocument() // add groups to cache table locked_updateCacheTableGrp(grpNodes, false); - //grpNodeIter.clear(); + std::map > msgCacheMap; pugi::xml_node nodeIter; @@ -402,7 +405,6 @@ void p3GroupDistrib::updateCacheDocument() pCacheId pCid; int count = 0; -// int count2 = 0, count3 = 0; for(; msgIt != mMsgHistPending.end(); msgIt++) { @@ -415,17 +417,6 @@ void p3GroupDistrib::updateCacheDocument() pCid = pCacheId(msgIt->second.first, msgIt->second.second); - // ensure you don't add cache ids twice to same group -// // by checking cache table and current msg additions -// if(nodeCache_iter->second.cIdSet.find(pCid) != -// nodeCache_iter->second.cIdSet.end()) -// count2++; -// -// if(msgCacheMap[msgIt->first].find(pCid) != msgCacheMap[msgIt->first].end()) -// count3++; - - - nodeIter = nodeCache_iter->second.node; messages_node = nodeIter.child("messages"); @@ -446,7 +437,6 @@ void p3GroupDistrib::updateCacheDocument() // add msg to grp set msgCacheMap[msgIt->first].insert(pCid); - count++; } else{ @@ -462,8 +452,6 @@ void p3GroupDistrib::updateCacheDocument() } } - - // now update cache table by tagging msg cache ids to their // respective groups locked_updateCacheTableMsg(msgCacheMap); @@ -493,6 +481,54 @@ void p3GroupDistrib::updateCacheDocument() return; } +struct by_cacheid +{ + bool operator()(pugi::xml_node node) const + { + bool cachIdEqual = true; + cachIdEqual &= (node.child_value("subId") == subId); + cachIdEqual &= (node.child_value("pId") == peerId); + return cachIdEqual; + } + + std::string peerId; + std::string subId; +}; + +void p3GroupDistrib::locked_removeCacheTableEntry(const pCacheId& pCid) +{ + + // search through cache document for all entries with this cache id + + pugi::xml_node_iterator nit = mCacheDoc.begin(); + by_cacheid bCid; + bCid.peerId = pCid.first; + + char subIdBuffer[6]; + std::string subId; + sprintf(subIdBuffer, "%d", pCid.second); + subId = subIdBuffer; + bCid.subId = subId; + + // for each grp, remove message nodes that match pCid + for(; nit != mCacheDoc.end(); nit++) + { + pugi::xml_node msgNode = nit->child("messages"); + + if(msgNode) + { + while(pugi::xml_node cNode= msgNode.find_child(bCid)) + { + msgNode.remove_child(cNode); + } + } + } + + locked_buildCacheTable(); + + return; +} + void p3GroupDistrib::locked_updateCacheTableGrp(const std::vector& grpNodes, bool historical) { @@ -570,6 +606,7 @@ bool p3GroupDistrib::locked_historyCached(const std::string& grpId, bool& cached std::map::iterator cit; if(mCacheTable.end() != (cit = mCacheTable.find(grpId))) { + cached = cit->second.cached; return true; } @@ -578,6 +615,8 @@ bool p3GroupDistrib::locked_historyCached(const std::string& grpId, bool& cached return false; } + + bool p3GroupDistrib::locked_historyCached(const pCacheId& cId) { @@ -605,6 +644,9 @@ bool p3GroupDistrib::locked_buildCacheTable(){ return false; } + // clear cache table + mCacheTable.clear(); + pugi::xml_node_iterator grpIt = mCacheDoc.begin(), msgIt; pugi::xml_node messages_node; std::map > msgCacheMap; @@ -662,12 +704,9 @@ bool p3GroupDistrib::locked_buildCacheTable(){ return true; } -void p3GroupDistrib::locked_processHistoryCached(const std::string& grpId) +void p3GroupDistrib::processHistoryCached(const std::string& grpId) { - // no processing should be done until cache locations have been stored in memory - if(mHistoricalCaches) - return; #ifdef DISTRIB_HISTORY_DEBUG std::cerr << "p3GroupDistrib::locked_processHistoryCached() " @@ -675,25 +714,43 @@ void p3GroupDistrib::locked_processHistoryCached(const std::string& grpId) #endif bool cached = true; - locked_historyCached(grpId, cached); + { + RsStackMutex stack(distribMtx); + + // no processing should be done until cache locations have been stored in memory + if(mHistoricalCaches) + return; + + locked_historyCached(grpId, cached); + } std::list cDataList; std::list::iterator cit; std::string file; - CacheData cDataTemp; + uint16_t cacheType = CacheSource::getCacheType(); // if not history cached then load it if(!cached) { + + // get list of cache id belonging to grp - locked_getHistoryCacheData(grpId, cDataList); - cit = cDataList.begin(); + { + RsStackMutex stack(distribMtx); + locked_getHistoryCacheData(grpId, cDataList); - for(; cit != cDataList.end(); cit++){ + cit = cDataList.begin(); + + for(; cit != cDataList.end(); cit++){ + cit->cid.type = cacheType; + locked_getStoredCache(*cit); + } + } + + // now load + for(cit = cDataList.begin(); cit != cDataList.end(); cit++){ - cit->cid.type = cacheType; - locked_getStoredCache(*cit); file = cit->path; file += "/"; file += cit->name; @@ -701,13 +758,16 @@ void p3GroupDistrib::locked_processHistoryCached(const std::string& grpId) // note: you could load msgs for a cache historied group that is not loaded, // but any request for info of affected grp will consequently load // all its msgs through this function anyways - locked_loadFileMsgs(file, cit->cid.subid, cit->pid, cit->recvd, false, true); + loadFileMsgs(file, cit->cid.subid, cit->pid, cit->recvd, (cit->pid == mOwnId), true, true); } - } - locked_updateCacheTableEntry(grpId, true); + { + RsStackMutex stack(distribMtx); + locked_updateCacheTableEntry(grpId, true); + } + return; } @@ -768,8 +828,7 @@ void p3GroupDistrib::locked_getHistoryCacheData(const std::string& grpId, std::l } else locked_getStoredCache(cDataTemp); - - cDataSet.push_back(cDataTemp); + cDataSet.push_back(cDataTemp); } } else @@ -831,6 +890,168 @@ bool p3GroupDistrib::locked_loadHistoryCacheFile() return ok; } +bool p3GroupDistrib::loadArchive(const std::string& grpId) +{ + + std::string filename; + msgArchMap::iterator it; + + { + RsStackMutex stack(distribMtx); + it = mMsgArchive.find(grpId); + + if(it != mMsgArchive.end()) + { + filename = it->second->msgFilePath; + } + else + { + return false; + } + } + + uint32_t bioflags = BIN_FLAGS_HASH_DATA | BIN_FLAGS_READABLE; + uint32_t stream_flags = BIN_FLAGS_READABLE; + + BinEncryptedFileInterface *bio = new BinEncryptedFileInterface(filename.c_str(), bioflags); + pqiSSLstore stream(setupSerialiser(), "CONFIG", bio, stream_flags); + + std::list load; + + if(!stream.getEncryptedItems(load)) + { +#ifdef DISTRIB_ARCH_DEBUG + std::cerr << "p3Distrib::loadArchive() Error occurred trying to msg Archive Item" << std::endl; +#endif + return false; + } + + RsStackMutex stack(distribMtx); + + if(it->second->msgFileHash != bio->gethash()) + { +#ifdef DISTRIB_ARCH_DEBUG + std::cerr << "p3Distrib::loadArchive() Error occurred archived File's Hash invalid" << std::endl; +#endif + + return false; + } + + std::list::iterator it = load.begin(); + RsDistribSignedMsg* rsdm = NULL; + for(;it!=load.end(); it++) + { + if(NULL != (rsdm = dynamic_cast(*it))){ + it->second->msgs.push_back(rsdm); + } + else + { + if(*it) + delete *it; + } + } + + it->second->loaded = true; + + return true; +} + + +bool p3GroupDistrib::locked_archiveMsg(const std::string& grpId, + RsDistribSignedMsg* msg) +{ + + // check if msg is within archive period + + /* check timestamp */ + time_t now = time(NULL); + uint32_t min = now - mArchivePeriod; + uint32_t max = now - mStorePeriod; + + if ((msg->timestamp < min) || (msg->timestamp > max)) + { +#ifdef DISTRIB_ARCH_DEBUG + std::cerr << "p3Distrib::locked_archiveMsg() Error, Msg to old to Archive " << std::endl; +#endif + + return false; + } + + msgArchMap::iterator it = mMsgArchive.find(grpId); + + // check an entry exists already + if(it != mMsgArchive.end()) + { + it->second->msgs.push_back(msg); + it->second->toArchive = true; + } + else // if not then make one + { + RsDistribMsgArchive* msgArch = new RsDistribMsgArchive(); + msgArch->msgs.push_back(msg); + msgArch->grpId = grpId; + msgArch->loaded = false; + msgArch->toArchive = true; + mMsgArchive.insert(std::pair( + grpId, msgArch)); + } + + return true; +} + +bool p3GroupDistrib::sendArchiveToFile(RsDistribMsgArchive* msgArch) +{ + + std::string filename = mKeyBackUpDir + "/grp-" + msgArch->grpId + "-archive.dist"; + + // encrypted storage + uint32_t bioflags = BIN_FLAGS_HASH_DATA | BIN_FLAGS_WRITEABLE; + uint32_t stream_flags = BIN_FLAGS_WRITEABLE; + + stream_flags |= BIN_FLAGS_NO_DELETE; + + BinEncryptedFileInterface *bio = new BinEncryptedFileInterface(filename.c_str(), bioflags); + pqiSSLstore *stream = new pqiSSLstore(setupSerialiser(), "CONFIG", bio, stream_flags); + + bool written = stream->encryptedSendItems(msgArch->msgs); + + msgArch->msgFileHash = bio->gethash(); + + if(msgArch->msgFileHash.empty()) + return false; + + msgArch->msgFilePath = filename; + + return written; +} + + +void p3GroupDistrib::archiveRun() +{ + + // quite expensive + RsStackMutex stack(distribMtx); + + msgArchMap::iterator it = mMsgArchive.begin(); + + // go through and archive all files + for(; it!=mMsgArchive.end(); it++) + { + + if(!it->second->toArchive) + { + sendArchiveToFile(it->second); + it->second->toArchive = false; + } + + } + + // indicate config to save meta data (file location and grpId pair) + IndicateConfigChanged(); + return; +} + + bool p3GroupDistrib::locked_saveHistoryCacheFile() { @@ -839,7 +1060,7 @@ bool p3GroupDistrib::locked_saveHistoryCacheFile() return false; std::string hFileName = mKeyBackUpDir + "/" + HIST_CACHE_FNAME; - std::ofstream hFile(hFileName.c_str(), std::ios::binary | std::ios::out); + std::ofstream hFile(hFileName.c_str(), std::ios::binary | std::ios::out); std::ostringstream cacheStream; char* fileBuffer = NULL; int streamLength; @@ -1007,7 +1228,7 @@ int p3GroupDistrib::loadAnyCache(const CacheData &data, bool local, bool his } else { - loadFileMsgs(file, data.cid.subid, data.pid, data.recvd, local, historical); + loadFileMsgs(file, data.cid.subid, data.pid, data.recvd, local, historical, false); } return true; @@ -1098,7 +1319,8 @@ void p3GroupDistrib::loadFileGroups(const std::string &filename, const std::stri return; } -void p3GroupDistrib::loadFileMsgs(const std::string &filename, uint16_t cacheSubId, const std::string &src, uint32_t ts, bool local, bool historical) +void p3GroupDistrib::loadFileMsgs(const std::string &filename, uint16_t cacheSubId, const std::string &src, uint32_t ts, bool local, bool historical, + bool cacheLoad) { #ifdef DISTRIB_DEBUG @@ -1109,24 +1331,31 @@ void p3GroupDistrib::loadFileMsgs(const std::string &filename, uint16_t cacheSub time_t now = time(NULL); bool cache = false; -#ifdef ENABLE_CACHE_OPT // if cache id exists in cache table exit { RsStackMutex stack(distribMtx); + // if this is a cache load proceed if not check + // cache id exists in cache table, if so don't load - if(locked_historyCached(pCacheId(src, cacheSubId))){ - return; - } - else + if(!cacheLoad) { - cache = true; + if(historical && locked_historyCached(pCacheId(src, cacheSubId))) + { + return; + } + else + { + cache = true; + } } + } -#endif // link grp to cache id (only one cache id, so doesn't matter if one grp comes out twice // with same cache id) std::map msgCacheMap; + + // if message loaded before check failed cache pCacheId failedCache = pCacheId(src, cacheSubId); /* create the serialiser to load msgs */ BinInterface *bio = new BinFileInterface(filename.c_str(), BIN_FLAGS_READABLE); @@ -1229,133 +1458,6 @@ void p3GroupDistrib::loadFileMsgs(const std::string &filename, uint16_t cacheSub return; } -//TODO make carbon copy of sister -void p3GroupDistrib::locked_loadFileMsgs(const std::string &filename, uint16_t cacheSubId, const std::string &src, uint32_t ts, bool local, bool historical) -{ - -#ifdef DISTRIB_DEBUG - std::cerr << "p3GroupDistrib::loadFileMsgs()"; - std::cerr << std::endl; -#endif - - time_t now = time(NULL); - bool cache = false; - -#ifdef ENABLE_CACHE_OPT - // if cache id exists in cache table exit - if(!historical){ - if(locked_historyCached(pCacheId(src, cacheSubId))){ - return; - } - else - { - cache = true; - } - } -#endif - - // link grp to cache id (only one cache id, so doesn't matter if one grp comes out twice - // with same cache id) - std::map msgCacheMap; - pCacheId failedCache = pCacheId(src, cacheSubId); - /* create the serialiser to load msgs */ - BinInterface *bio = new BinFileInterface(filename.c_str(), BIN_FLAGS_READABLE); - pqistore *store = createStore(bio, src, BIN_FLAGS_READABLE); - -#ifdef DISTRIB_DEBUG - std::cerr << "loading file " << filename << std::endl ; -#endif - - RsItem *item; - RsDistribSignedMsg *newMsg; - std::string grpId; - - while(NULL != (item = store->GetItem())) - { -#ifdef DISTRIB_DEBUG - std::cerr << "p3GroupDistrib::loadFileMsgs() Got Item:"; - std::cerr << std::endl; - item->print(std::cerr, 10); - std::cerr << std::endl; -#endif - - if ((newMsg = dynamic_cast(item))) - { - grpId = newMsg->grpId; - if(locked_loadMsg(newMsg, src, local, historical)) - { - if(cache) - { - msgCacheMap.insert(grpCachePair(grpId, pCacheId(src, cacheSubId))); - } - } - } - else - { -#ifdef DISTRIB_DEBUG - std::cerr << "p3GroupDistrib::loadFileMsgs() Unexpected Item - deleting"; - std::cerr << std::endl; -#endif - /* wrong message type */ - delete item; - } - } - - std::map::iterator mit; - - if(cache){ - - mit = msgCacheMap.begin(); - for(;mit != msgCacheMap.end(); mit++) - { - mMsgHistPending.push_back(grpCachePair(mit->first, mit->second)); - } - mUpdateCacheDoc = true; - if(!msgCacheMap.empty()) - mCount++; - - std::string failedCacheId = FAILED_CACHE_CONT; - - // if msg cache map is empty then cache id failed - if(msgCacheMap.empty()) - mMsgHistPending.push_back(grpCachePair(failedCacheId, failedCache)); - } - - if (local) - { - /* now we create a map of time -> subid - * This is used to determine the newest and the oldest items - */ -#ifdef DISTRIB_DEBUG - std::cerr << "p3GroupDistrib::loadFileMsgs() Updating Local TimeStamps"; - std::cerr << std::endl; - std::cerr << "p3GroupDistrib::loadFileMsgs() CacheSubId: " << cacheSubId << " recvd: " << ts; - std::cerr << std::endl; -#endif - - mLocalCacheTs[ts] = cacheSubId; - if (cacheSubId > mMaxCacheSubId) - { -#ifdef DISTRIB_DEBUG - std::cerr << "p3GroupDistrib::loadFileMsgs() New Max CacheSubId"; - std::cerr << std::endl; -#endif - mMaxCacheSubId = cacheSubId; - } - - if (((time_t) ts < now) && ((time_t) ts > mLastPublishTime)) - { -#ifdef DISTRIB_DEBUG - std::cerr << "p3GroupDistrib::loadFileMsgs() New LastPublishTime"; - std::cerr << std::endl; -#endif - mLastPublishTime = ts; - } - } - - delete store; - return; -} /***************************************************************************************/ /***************************************************************************************/ /********************** load Cache Msgs ***************************************/ @@ -1671,9 +1773,15 @@ bool p3GroupDistrib::loadMsg(RsDistribSignedMsg *newMsg, const std::string &src, std::cerr << "p3GroupDistrib::loadMsg() check failed" << std::endl; std::cerr << std::endl; #endif - delete newMsg; - delete msg; - return false; + + // out of range, archive if subscribed and if archiving + // successful allow to continue loading + if(!locked_archiveMsg(newMsg->grpId, newMsg) + && (git->second.flags & RS_DISTRIB_SUBSCRIBED)){ + delete newMsg; + delete msg; + return false; + } } /* accept message */ @@ -1985,7 +2093,10 @@ void p3GroupDistrib::locked_publishPendingMsgs() newCache.pid = mOwnId; newCache.cid.type = CacheSource::getCacheType(); - newCache.cid.subid = locked_determineCacheSubId(); + newCache.cid.subid = locked_determineCacheSubId(); + + // remove old cache entry using this pid + locked_removeCacheTableEntry(pCacheId(newCache.pid, newCache.cid.subid)); /* create filename */ std::string path = CacheSource::getCacheDir(); @@ -2001,6 +2112,11 @@ void p3GroupDistrib::locked_publishPendingMsgs() bool resave = false; std::list::iterator it; + + // for cache opt + std::list gcpList; + pCacheId pcId(newCache.pid, newCache.cid.subid); + for(it = mPendingPublish.begin(); it != mPendingPublish.end(); it++) { #ifdef DISTRIB_DEBUG @@ -2022,6 +2138,8 @@ void p3GroupDistrib::locked_publishPendingMsgs() // prevent sending original source of message to peers (*it)->PeerId(mOwnId); + gcpList.push_back(grpCachePair((*it)->grpId, pcId)); + if(!store->SendItem(*it)) /* deletes it */ { ok &= false; @@ -2060,6 +2178,7 @@ void p3GroupDistrib::locked_publishPendingMsgs() if(ok) refreshCache(newCache); + std::list::iterator git = gcpList.begin(); if (ok && resave) { @@ -2069,7 +2188,15 @@ void p3GroupDistrib::locked_publishPendingMsgs() #endif /* flag to store config (saying we've published messages) */ IndicateConfigChanged(); /**** INDICATE CONFIG CHANGED! *****/ + + // add new cache to cache opt doc + + for(;git != gcpList.end(); git++) + mMsgHistPending.push_back(*git); + + mUpdateCacheDoc = true; } + } @@ -2299,11 +2426,12 @@ void p3GroupDistrib::getPopularGroupList(uint32_t popMin, uint32_t popMax, std:: bool p3GroupDistrib::getAllMsgList(const std::string& grpId, std::list &msgIds) { + processHistoryCached(grpId); + + RsStackMutex stack(distribMtx); /************* STACK MUTEX ************/ -#ifdef ENABLE_CACHE_OPT - locked_processHistoryCached(grpId); -#endif + std::map::iterator git; if (mGroups.end() == (git = mGroups.find(grpId))) @@ -2324,11 +2452,11 @@ bool p3GroupDistrib::getAllMsgList(const std::string& grpId, std::list &msgIds) { + + processHistoryCached(grpId); + RsStackMutex stack(distribMtx); /************* STACK MUTEX ************/ -#ifdef ENABLE_CACHE_OPT - locked_processHistoryCached(grpId); -#endif std::map::iterator git; if (mGroups.end() == (git = mGroups.find(grpId))) @@ -2389,7 +2517,8 @@ RsDistribMsg *p3GroupDistrib::locked_getGroupMsg(const std::string& grpId, const /************* ALREADY LOCKED ************/ - locked_processHistoryCached(grpId); + +// processHistoryCached(grpId); std::map::iterator git; if (mGroups.end() == (git = mGroups.find(grpId))) @@ -2678,6 +2807,20 @@ bool p3GroupDistrib::saveList(bool &cleanup, std::list& saveData) } + + /* save msg history meta data */ + msgArchMap::iterator maIt = mMsgArchive.begin(); + + for(;maIt != mMsgArchive.end(); maIt++) + { + RsDistribMsgHstry* msgHstry = new RsDistribMsgHstry(); + msgHstry->grpId = maIt->first; + msgHstry->msgHstryFileHash = maIt->second->msgFileHash; + msgHstry->msgHstryFilePath = maIt->second->msgFilePath; + saveData.push_back(msgHstry); + saveCleanupList.push_back(msgHstry); + } + std::list childSaveL = childSaveList(); std::list::iterator cit = childSaveL.begin(); RsSerialType *childSer = createSerialiser(); @@ -2700,9 +2843,7 @@ bool p3GroupDistrib::saveList(bool &cleanup, std::list& saveData) delete childSer; // now save hostory doc -#ifdef ENABLE_CACHE_OPT locked_saveHistoryCacheFile(); -#endif return true; } @@ -2726,14 +2867,12 @@ bool p3GroupDistrib::loadList(std::list& load) { std::list::iterator lit; -#ifdef ENABLE_CACHE_OPT { RsStackMutex stack(distribMtx); if(locked_loadHistoryCacheFile()) locked_buildCacheTable(); } -#endif /* for child config data */ std::list childLoadL; @@ -2748,6 +2887,7 @@ bool p3GroupDistrib::loadList(std::list& load) RsDistribGrpKey *newKey = NULL; RsDistribSignedMsg *newMsg = NULL; RsDistribConfigData* newChildConfig = NULL; + RsDistribMsgHstry* msgHstry = NULL; if ((newGrp = dynamic_cast(*lit))) @@ -2755,7 +2895,6 @@ bool p3GroupDistrib::loadList(std::list& load) const std::string &gid = newGrp -> grpId; if(loadGroup(newGrp, false)){ -#ifdef ENABLE_CACHE_OPT bool cached = false; RsStackMutex stack(distribMtx); @@ -2765,7 +2904,6 @@ bool p3GroupDistrib::loadList(std::list& load) mGrpHistPending.push_back(gcPair); mUpdateCacheDoc = true; } -#endif } subscribeToGroup(gid, true); } @@ -2790,6 +2928,24 @@ bool p3GroupDistrib::loadList(std::list& load) newMsg->PeerId(mOwnId); loadMsg(newMsg, mOwnId, false, false); /* false so it'll pushed to PendingPublish list */ } + else if ((msgHstry = dynamic_cast(*lit))) + { + RsDistribMsgArchive* msgArch = new RsDistribMsgArchive(); + msgArch->grpId = msgHstry->grpId; + msgArch->loaded = false; + msgArch->msgFileHash = msgHstry->msgHstryFileHash; + msgArch->msgFilePath = msgHstry->msgHstryFilePath; + msgArch->toArchive = false; + + { + RsStackMutex stack(distribMtx); + mMsgArchive.insert(std::pair( + msgArch->grpId, msgArch)); + } + + delete msgHstry; + + } else if ((newChildConfig = dynamic_cast(*lit))) { RsItem* childConfigItem = childSer->deserialise(newChildConfig->service_data.bin_data, @@ -2798,6 +2954,7 @@ bool p3GroupDistrib::loadList(std::list& load) childLoadL.push_back(childConfigItem); } + } /* no need to republish until something new comes in */ @@ -5092,17 +5249,15 @@ bool p3GroupDistrib::locked_printDummyMsgs(GroupInfo &grp) bool p3GroupDistrib::getDummyParentMsgList(const std::string& grpId, const std::string& pId, std::list &msgIds) { + // load grp from history cache if not already loaded + processHistoryCached(grpId); + #ifdef DISTRIB_DUMMYMSG_DEBUG std::cerr << "p3GroupDistrib::getDummyParentMsgList(grpId:" << grpId << "," << pId << ")"; std::cerr << std::endl; #endif RsStackMutex stack(distribMtx); /************* STACK MUTEX ************/ - // load grp from history cache if not already loaded -#ifdef ENABLE_CACHE_OPT - locked_processHistoryCached(grpId); -#endif - std::map::iterator git; if (mGroups.end() == (git = mGroups.find(grpId))) { @@ -5133,7 +5288,9 @@ bool p3GroupDistrib::getDummyParentMsgList(const std::string& grpId, const std:: RsDistribDummyMsg *p3GroupDistrib::locked_getGroupDummyMsg(const std::string& grpId, const std::string& msgId) { - locked_processHistoryCached(grpId); + +// processHistoryCached(grpId); + #ifdef DISTRIB_DUMMYMSG_DEBUG std::cerr << "p3GroupDistrib::locked_getGroupDummyMsg(grpId:" << grpId << "," << msgId << ")"; std::cerr << std::endl; diff --git a/libretroshare/src/services/p3distrib.h b/libretroshare/src/services/p3distrib.h index 738fd1c74..31291b1cb 100644 --- a/libretroshare/src/services/p3distrib.h +++ b/libretroshare/src/services/p3distrib.h @@ -250,6 +250,7 @@ typedef std::pair grpNodePair; // (is loaded, iter // these make up a cache list typedef std::pair pCacheId; //(pid, subid) typedef std::pair grpCachePair; // (grpid, cid) +typedef std::map msgArchMap; /*! * grp node content for faster access @@ -309,15 +310,43 @@ class CacheDataPending bool mHistorical; }; +class RsDistribMsgArchive +{ +public: + + RsDistribMsgArchive(); + + std::list msgs; + std::string grpId; + std::string msgFileHash; + std::string msgFilePath; + bool loaded; + bool toArchive; + +}; + class p3GroupDistrib: public CacheSource, public CacheStore, public p3Config, public p3ThreadedService { public: - p3GroupDistrib(uint16_t subtype, + /*! + * + * @param subtype service type + * @param cs handle to cache strapper + * @param cft handle to cache transfer, required to correctly initialise p3GroupDistrib + * @param sourcedir directory for remote cache files + * @param storedir directory for local cache files + * @param keyBackUpDir when key back function invoked, keys are stored here + * @param configId + * @param storePeriod how long local msgs are kept for + * @param archivePeriod how long archived msgs are kept for + * @param pubPeriod length of time interval before pending msgs/grps are published + */ + p3GroupDistrib(uint16_t subtype, CacheStrapper *cs, CacheTransfer *cft, std::string sourcedir, std::string storedir, std::string keyBackUpDir, uint32_t configId, - uint32_t storePeriod, uint32_t pubPeriod); + uint32_t storePeriod, uint32_t archivePeriod, uint32_t pubPeriod); virtual ~p3GroupDistrib() ; @@ -394,12 +423,6 @@ class p3GroupDistrib: public CacheSource, public CacheStore, public p3Config, pu */ bool locked_buildCacheTable(void); - /*! - * if grp's message is not loaded, load it, and update cache table - * @param grpId group whose messages to load if not cached - */ - void locked_processHistoryCached(const std::string& grpId); - /*! * loads cache data which contains location of cache files belonging @@ -419,6 +442,47 @@ class p3GroupDistrib: public CacheSource, public CacheStore, public p3Config, pu */ bool locked_loadHistoryCacheFile(); + /*! + * this removes the given cache id and associated msgs nodes from + * all grp nodes + * cache table is updated to reflect document + * this costly, and is here to be called once a year has been reached on + * @param pCid the cache id to remove from cache document + */ + void locked_removeCacheTableEntry(const pCacheId& pCid); + + /*! + * + * @param grpId + * @param msg + * @return + */ + bool locked_archiveMsg(const std::string& grpId, RsDistribSignedMsg* msg); + + /*! + * + * @param grpId archive msgs to load + * @return false if there are no archived msgs + */ + bool loadArchive(const std::string& grpId); + + + /*! + * the hash and path for msgArch is set here + * do not call frequently expensive IO + * @param msgArch the archive to send to file + * @return if archiving to file succeeded + */ + bool sendArchiveToFile(RsDistribMsgArchive* msgArch); + + /*! + * to be called, preferably in periods, archives flagged + * to be sent to file will be archived and + * IndicateConfigChanged is called to save + * archive file locations + */ + void archiveRun(); + private: /* these lists are filled by the overloaded fns... then cleared by the thread */ @@ -430,8 +494,18 @@ class p3GroupDistrib: public CacheSource, public CacheStore, public p3Config, pu /* load cache files */ void loadFileGroups(const std::string &filename, const std::string &src, bool local, bool historical, const pCacheId& cid); - void loadFileMsgs(const std::string &filename, uint16_t cacheSubId, const std::string &src, uint32_t ts, bool local, bool historical); - void locked_loadFileMsgs(const std::string &filename, uint16_t cacheSubId, const std::string &src, uint32_t ts, bool local, bool historical); + + /*! + * @param filename absolute cache file path + * @param cacheSubId cache subid, needed to save cache to history file + * @param src peer src id + * @param ts timestamp + * @param local set to whether it islocal or remote cache + * @param historical set to whether it is an old cache + * @param cacheLoad is a history cache opt load, prevent adding to cache history again + */ + void loadFileMsgs(const std::string &filename, uint16_t cacheSubId, const std::string &src, uint32_t ts, + bool local, bool historical, bool cacheLoad); bool backUpKeys(const std::list &keysToBackUp, std::string grpId); void locked_sharePubKey(); @@ -478,6 +552,11 @@ class p3GroupDistrib: public CacheSource, public CacheStore, public p3Config, pu bool loadGroupKey(RsDistribGrpKey *newKey, bool historical); + /*! + * if grp's message is not loaded, load it, and update cache table + * @param grpId group whose messages to load if not cached + */ + void processHistoryCached(const std::string& grpId); /***************************************************************************************/ /***************************************************************************************/ @@ -658,6 +737,9 @@ class p3GroupDistrib: public CacheSource, public CacheStore, public p3Config, pu void locked_publishPendingMsgs(); /*! + * This function is key to determining how long caches permeate + * a distributed network, after mStorePeriod has elapsed for a message + * it is over written since its cache subid is used for the cache file name * @return cache sub id */ uint16_t locked_determineCacheSubId(); @@ -851,7 +933,7 @@ RsDistribDummyMsg *locked_getGroupDummyMsg(const std::string& grpId, const std:: std::list mLocalCaches; std::map mGroups; - uint32_t mStorePeriod, mPubPeriod; + uint32_t mStorePeriod, mPubPeriod, mArchivePeriod; /* Message Publishing */ std::list mPendingPublish; @@ -888,11 +970,14 @@ RsDistribDummyMsg *locked_getGroupDummyMsg(const std::string& grpId, const std:: time_t mLastCacheDocUpdate; bool mUpdateCacheDoc, mHistoricalCachesLoaded; + std::map mCacheTable; // (cid, node) /// contains information on cached data pugi::xml_document mCacheDoc; - + + /* msg archiving */ + msgArchMap mMsgArchive; }; diff --git a/libretroshare/src/services/p3forums.cc b/libretroshare/src/services/p3forums.cc index 45d3c9b2a..509be38f0 100644 --- a/libretroshare/src/services/p3forums.cc +++ b/libretroshare/src/services/p3forums.cc @@ -75,13 +75,15 @@ RsForums *rsForums = NULL; /* Forums will be initially stored for 1 year * remember 2^16 = 64K max units in store period. * PUBPERIOD * 2^16 = max STORE PERIOD */ -#define FORUM_STOREPERIOD (365*24*3600) /* 365 * 24 * 3600 - secs in a year */ -#define FORUM_PUBPERIOD 600 /* 10 minutes ... (max = 455 days) */ +#define FORUM_STOREPERIOD (60*24*3600) /* 60 * 24 * 3600 - secs in two months */ +#define FORUM_PUBPERIOD 30 /* 10 minutes ... (max = 455 days) */ +#define FORUM_ARCHIVE_PERIOD (365*24*3600) /* 365 * 24 * 3600 - secs in a year */ p3Forums::p3Forums(uint16_t type, CacheStrapper *cs, CacheTransfer *cft, std::string srcdir, std::string storedir, std::string forumDir) :p3GroupDistrib(type, cs, cft, srcdir, storedir, forumDir, - CONFIG_TYPE_FORUMS, FORUM_STOREPERIOD, FORUM_PUBPERIOD), + CONFIG_TYPE_FORUMS, FORUM_STOREPERIOD, FORUM_ARCHIVE_PERIOD, + FORUM_PUBPERIOD), mForumsDir(forumDir) { @@ -294,6 +296,7 @@ bool p3Forums::getForumThreadMsgList(const std::string &fId, const std::string & bool p3Forums::getForumMessage(const std::string &fId, const std::string &mId, ForumMsgInfo &info) { + processHistoryCached(fId); RsStackMutex stack(distribMtx); /***** STACK LOCKED MUTEX *****/ RsDistribMsg *msg = locked_getGroupMsg(fId, mId); @@ -550,7 +553,7 @@ bool p3Forums::getMessageCount(const std::string &fId, unsigned int &newCount, u if (grpFlags & (RS_DISTRIB_ADMIN | RS_DISTRIB_SUBSCRIBED)) { std::list msgIds; - if (getAllMsgList(fId, msgIds)) { + if (getAllMsgList(fId, msgIds)) { // get msg ids without causing a costly cache load RsStackMutex stack(distribMtx); /***** STACK LOCKED MUTEX *****/ diff --git a/retroshare-gui/src/RetroShare.pro b/retroshare-gui/src/RetroShare.pro index 833d34678..b337b65ff 100644 --- a/retroshare-gui/src/RetroShare.pro +++ b/retroshare-gui/src/RetroShare.pro @@ -11,7 +11,7 @@ RCC_DIR = temp/qrc UI_DIR = temp/ui MOC_DIR = temp/moc -#CONFIG += debug +CONFIG += debug debug { QMAKE_CFLAGS += -g } diff --git a/retroshare-gui/src/gui/ForumsDialog.cpp b/retroshare-gui/src/gui/ForumsDialog.cpp index 0912ae407..996b69bd6 100644 --- a/retroshare-gui/src/gui/ForumsDialog.cpp +++ b/retroshare-gui/src/gui/ForumsDialog.cpp @@ -134,6 +134,8 @@ ForumsDialog::ForumsDialog(QWidget *parent) m_bProcessSettings = false; subscribeFlags = 0; + mFillthreadCount = 0; + connect( ui.forumTreeWidget, SIGNAL( treeCustomContextMenuRequested( QPoint ) ), this, SLOT( forumListCustomPopupMenu( QPoint ) ) ); connect( ui.threadTreeWidget, SIGNAL( customContextMenuRequested( QPoint ) ), this, SLOT( threadListCustomPopupMenu( QPoint ) ) ); @@ -465,7 +467,9 @@ void ForumsDialog::updateDisplay() { std::list forumIds; std::list::iterator it; - if (!rsForums) + + // suspend access to forum while thread is running + if (!rsForums || (mFillthreadCount != 0)) return; if (rsForums->forumsChanged(forumIds)) @@ -770,9 +774,11 @@ void ForumsDialog::fillThreadFinished() thread = NULL; } + mFillthreadCount -= 1; #ifdef DEBUG_FORUMS std::cerr << "ForumsDialog::fillThreadFinished done" << std::endl; #endif + } void ForumsDialog::fillThreadProgress(int current, int count) @@ -866,6 +872,8 @@ void ForumsDialog::insertThreads() std::cerr << "ForumsDialog::insertThreads() Start fill thread" << std::endl; #endif + mFillthreadCount +=1; + // start thread fillThread->start(); } diff --git a/retroshare-gui/src/gui/ForumsDialog.h b/retroshare-gui/src/gui/ForumsDialog.h index 8b9074f0e..9a6850d87 100644 --- a/retroshare-gui/src/gui/ForumsDialog.h +++ b/retroshare-gui/src/gui/ForumsDialog.h @@ -136,6 +136,7 @@ private: QFont m_ForumNameFont; int lastViewType; std::string lastForumID; + int mFillthreadCount; ForumsFillThread *fillThread;