msg history space optimisation branch

git-svn-id: http://svn.code.sf.net/p/retroshare/code/branches/v0.5-msghistory@4487 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
chrisparker126 2011-07-24 18:00:50 +00:00
parent b7cbe7045c
commit 2f315cc0fc
15 changed files with 681 additions and 224 deletions

View file

@ -1,14 +1,13 @@
TEMPLATE = lib
#CONFIG += staticlib release
#CONFIG += staticlib testnetwork
CONFIG += staticlib bitdht
CONFIG += staticlib bitdht debug
CONFIG -= qt
TARGET = retroshare
# Beware: All data of the stripped services are lost
#CONFIG += minimal
DEFINES *= PQI_DISABLE_TUNNEL
#ENABLE_CACHE_OPT
DEFINES *= PQI_DISABLE_TUNNEL ENABLE_CACHE_OPT
minimal {
CONFIG -= use_blogs

View file

@ -740,6 +740,7 @@ bool p3Config::loadAttempt(const std::string& cfgFname,const std::string& signFn
uint32_t bioflags = BIN_FLAGS_HASH_DATA | BIN_FLAGS_READABLE;
uint32_t stream_flags = BIN_FLAGS_READABLE;
// bio is cleaned up after stream goes out of scope
BinEncryptedFileInterface *bio = new BinEncryptedFileInterface(cfgFname.c_str(), bioflags);
pqiSSLstore stream(setupSerialiser(), "CONFIG", bio, stream_flags);

View file

@ -41,7 +41,16 @@
class pqistore: public PQInterface
{
public:
/*!
* bio passed must be valid throughout lifetime of a pqistore instance
* @param rss
* @param srcId
* @param bio_in pqistore deletes bio once constructor called
* @param bio_flagsin
*/
pqistore(RsSerialiser *rss, const std::string &srcId, BinInterface *bio_in, int bio_flagsin);
virtual ~pqistore();
// PQInterface
@ -84,6 +93,13 @@ class pqiSSLstore: public pqistore
public:
/*!
*
* @param rss
* @param srcId
* @param bio_in deleted once pqiSSLstore call its destructor
* @param bio_flagsin
*/
pqiSSLstore(RsSerialiser *rss, std::string srcId, BinEncryptedFileInterface *bio_in, int bio_flagsin);
virtual ~pqiSSLstore();

View file

@ -115,6 +115,11 @@ std::ostream &operator<<(std::ostream &out, const ForumMsgInfo &info);
class RsForums;
extern RsForums *rsForums;
/*!
* @brief interface to rs forums, a distributed cache based service
* @note avoid requesting available messages until requested by the user, requesting any information
* on messages for a grpId is expensive memory wise.
*/
class RsForums
{
public:
@ -148,6 +153,13 @@ virtual bool ForumMessageSend(ForumMsgInfo &info) = 0;
virtual bool forumRestoreKeys(const std::string& fId) = 0;
virtual bool forumSubscribe(const std::string &fId, bool subscribe) = 0;
/*!
*
* @param fId forumId to retrieve msg count for, leave as blank to get msg count for all subscribed forums
* @param newCount number new msgs
* @param unreadCount number of unread msgs
* @return true if successful, false otherwise
*/
virtual bool getMessageCount(const std::string &fId, unsigned int &newCount, unsigned int &unreadCount) = 0;
/****************************************/

View file

@ -858,6 +858,134 @@ RsDistribConfigData *RsDistribSerialiser::deserialiseConfigData(void *data, uint
}
uint32_t RsDistribSerialiser::sizeMsgHstry(RsDistribMsgHstry *item)
{
uint32_t s = 8; /* header */
/* RsDistribSignedMsg stuff */
s += GetTlvStringSize(item->grpId);
s += GetTlvStringSize(item->msgHstryFileHash);
s += GetTlvStringSize(item->msgHstryFilePath);
return s;
}
/* serialise the data to the buffer */
bool RsDistribSerialiser::serialiseMsgHstry(RsDistribMsgHstry *item, void *data, uint32_t *pktsize)
{
#ifdef RSSERIAL_DEBUG
std::cerr << "RsDistribSerialiser::serialiseMsgHstry()" << std::endl;
#endif
uint32_t tlvsize = sizeMsgHstry(item);
uint32_t offset = 0;
if (*pktsize < tlvsize)
{
#ifdef RSSERIAL_DEBUG
std::cerr << "RsDistribSerialiser::serialiseMsgHstry() FAIL no space" << std::endl;
#endif
return false; /* not enough space */
}
*pktsize = tlvsize;
bool ok = true;
ok &= setRsItemHeader(data, tlvsize, item->PacketId(), tlvsize);
/* skip the header */
offset += 8;
/* grpId */
ok &= SetTlvString(data, tlvsize, &offset, TLV_TYPE_STR_GROUPID, item->grpId);
ok &= SetTlvString(data, tlvsize, &offset, TLV_TYPE_STR_HASH_SHA1, item->msgHstryFileHash);
ok &= SetTlvString(data, tlvsize, &offset, TLV_TYPE_STR_PATH, item->msgHstryFilePath);
if (offset != tlvsize)
{
#ifdef RSSERIAL_DEBUG
std::cerr << "RsDistribSerialiser::serialiseMsgHstry() FAIL Size Error! " << std::endl;
#endif
ok = false;
}
#ifdef RSSERIAL_DEBUG
if (!ok)
{
std::cerr << "RsDistribSerialiser::serialiseMsgHstry() NOK" << std::endl;
}
#endif
return ok;
}
RsDistribMsgHstry *RsDistribSerialiser::deserialiseMsgHstry(void *data, uint32_t *pktsize)
{
#ifdef RSSERIAL_DEBUG
std::cerr << "RsDistribSerialiser::deserialiseMsgHstry()" << std::endl;
#endif
/* get the type and size */
uint32_t rstype = getRsItemId(data);
uint32_t rssize = getRsItemSize(data);
uint32_t offset = 0;
if ((RS_PKT_VERSION_SERVICE != getRsItemVersion(rstype)) ||
(SERVICE_TYPE != getRsItemService(rstype)) ||
(RS_PKT_SUBTYPE_DISTRIB_MSG_HSTRY != getRsItemSubType(rstype)))
{
#ifdef RSSERIAL_DEBUG
std::cerr << "RsDistribSerialiser::deserialiseMsgHstry() Wrong Type" << std::endl;
#endif
return NULL; /* wrong type */
}
if (*pktsize < rssize) /* check size */
{
#ifdef RSSERIAL_DEBUG
std::cerr << "RsDistribSerialiser::deserialiseMsgHstry() Wrong Size" << std::endl;
#endif
return NULL; /* not enough data */
}
/* set the packet length */
*pktsize = rssize;
bool ok = true;
RsDistribMsgHstry* item = new RsDistribMsgHstry();
/* skip the header */
offset += 8;
ok &= GetTlvString(data, rssize, &offset, TLV_TYPE_STR_GROUPID, item->grpId);
ok &= GetTlvString(data, rssize, &offset, TLV_TYPE_STR_HASH_SHA1, item->msgHstryFileHash);
ok &= GetTlvString(data, rssize, &offset, TLV_TYPE_STR_PATH, item->msgHstryFilePath);
if (offset != rssize)
{
#ifdef RSSERIAL_DEBUG
std::cerr << "RsDistribSerialiser::deserialiseMsgHstry() size mismatch" << std::endl;
#endif
/* error */
delete item;
return NULL;
}
if (!ok)
{
#ifdef RSSERIAL_DEBUG
std::cerr << "RsDistribSerialiser::deserialiseMsgHstry() NOK" << std::endl;
#endif
delete item;
return NULL;
}
return item;
}
@ -867,6 +995,8 @@ uint32_t RsDistribSerialiser::size(RsItem *i)
RsDistribGrpKey *dgk;
RsDistribSignedMsg *dsm;
RsDistribConfigData *dsd;
RsDistribMsgHstry *dmh;
/* in order of frequency */
if (NULL != (dsm = dynamic_cast<RsDistribSignedMsg *>(i)))
@ -885,6 +1015,10 @@ uint32_t RsDistribSerialiser::size(RsItem *i)
{
return sizeConfigData(dsd);
}
else if(NULL != (dmh = dynamic_cast<RsDistribMsgHstry *>(i)))
{
return sizeMsgHstry(dmh);
}
return 0;
}
@ -898,6 +1032,7 @@ bool RsDistribSerialiser::serialise(RsItem *i, void *data, uint32_t *pktsize
RsDistribGrpKey *dgk;
RsDistribSignedMsg *dsm;
RsDistribConfigData *dsd;
RsDistribMsgHstry *dmh;
if (NULL != (dsm = dynamic_cast<RsDistribSignedMsg *>(i)))
{
@ -915,6 +1050,10 @@ bool RsDistribSerialiser::serialise(RsItem *i, void *data, uint32_t *pktsize
{
return serialiseConfigData(dsd, data, pktsize);
}
else if(NULL != (dsd = dynamic_cast<RsDistribConfigData *>(i)))
{
return serialiseMsgHstry(dmh, data, pktsize);
}
return false;
}
@ -946,6 +1085,9 @@ RsItem *RsDistribSerialiser::deserialise(void *data, uint32_t *pktsize)
case RS_PKT_SUBTYPE_DISTRIB_CONFIG_DATA:
return deserialiseConfigData(data, pktsize);
break;
case RS_PKT_SUBTYPE_DISTRIB_MSG_HSTRY:
return deserialiseMsgHstry(data, pktsize);
break;
default:
return NULL;
break;

View file

@ -38,6 +38,7 @@ const uint8_t RS_PKT_SUBTYPE_DISTRIB_GRP = 0x01;
const uint8_t RS_PKT_SUBTYPE_DISTRIB_GRP_KEY = 0x02;
const uint8_t RS_PKT_SUBTYPE_DISTRIB_SIGNED_MSG = 0x03;
const uint8_t RS_PKT_SUBTYPE_DISTRIB_CONFIG_DATA = 0x04;
const uint8_t RS_PKT_SUBTYPE_DISTRIB_MSG_HSTRY = 0x10;
/**************************************************************************/
@ -209,6 +210,30 @@ virtual std::ostream& print(std::ostream &out, uint16_t indent = 0);
RsTlvSecurityKey key;
};
/*!
* for storing file to archived msgs of a group
*/
class RsDistribMsgHstry: public RsItem
{
public:
RsDistribMsgHstry(uint16_t service_type)
:RsItem(RS_PKT_VERSION_SERVICE, service_type, RS_PKT_SUBTYPE_DISTRIB_MSG_HSTRY)
{ return; }
RsDistribMsgHstry()
:RsItem(RS_PKT_VERSION_SERVICE, RS_SERVICE_TYPE_DISTRIB, RS_PKT_SUBTYPE_DISTRIB_MSG_HSTRY)
{ return; }
virtual ~RsDistribMsgHstry() { return; }
virtual void clear();
virtual std::ostream& print(std::ostream &out, uint16_t indent = 0);
std::string grpId; /* Grp Id */
std::string msgHstryFilePath; // path to where archived msgs are stored
std::string msgHstryFileHash; // hash of file at file path
};
class RsDistribSerialiser: public RsSerialType
{
@ -248,6 +273,10 @@ virtual uint32_t sizeConfigData(RsDistribConfigData *);
virtual bool serialiseConfigData(RsDistribConfigData *item, void *data, uint32_t *size);
virtual RsDistribConfigData *deserialiseConfigData(void* data, uint32_t *size);
/* For RS_PKT_SUBTYPE_DISTRIB_MSG_HSTRY */
virtual uint32_t sizeMsgHstry(RsDistribMsgHstry *);
virtual bool serialiseMsgHstry(RsDistribMsgHstry *item, void *data, uint32_t *size);
virtual RsDistribMsgHstry*deserialiseMsgHstry(void* data, uint32_t *size);
const uint16_t SERVICE_TYPE;

View file

@ -81,7 +81,7 @@ public:
std::string forumId;
/// a map which contains the read for messages within a forum
/// a map (msgId, status bit-field) which contains the read status for messages within a forum
std::map<std::string, uint32_t> msgReadStatus;
};

View file

@ -68,14 +68,16 @@ RsBlogs *rsBlogs = NULL;
/* Blogs will be initially stored for 1 year
* remember 2^16 = 64K max units in store period.
* PUBPERIOD * 2^16 = max STORE PERIOD */
#define BLOG_STOREPERIOD (90*24*3600) /* 30 * 24 * 3600 - secs in a year */
#define BLOG_STOREPERIOD (60*24*3600) /* 30 * 24 * 3600 - secs in a year */
#define BLOG_PUBPERIOD 600 /* 10 minutes ... (max = 455 days) */
#define BLOG_ARCHIVE_PERIOD (180*24*3600) /* 180 * 24 * 3600 - secs in a year */
p3Blogs::p3Blogs(uint16_t type, CacheStrapper *cs,
CacheTransfer *cft,
std::string srcdir, std::string storedir)
:p3GroupDistrib(type, cs, cft, srcdir, storedir, "",
CONFIG_TYPE_QBLOG, BLOG_STOREPERIOD, BLOG_PUBPERIOD)
CONFIG_TYPE_QBLOG, BLOG_STOREPERIOD, BLOG_ARCHIVE_PERIOD,
BLOG_PUBPERIOD)
{
return;
}

View file

@ -66,13 +66,15 @@ RsChannels *rsChannels = NULL;
* PUBPERIOD * 2^16 = max STORE PERIOD */
#define CHANNEL_STOREPERIOD (30*24*3600) /* 30 * 24 * 3600 - secs in a 30 day month */
#define CHANNEL_PUBPERIOD 120 /* 2 minutes ... (max = 455 days) */
#define CHANNEL_ARCHIVE_PERIOD (180*24*3600)
#define MAX_AUTO_DL 1E9 /* auto download of attachment limit; 1 GIG */
p3Channels::p3Channels(uint16_t type, CacheStrapper *cs,
CacheTransfer *cft, RsFiles *files,
std::string srcdir, std::string storedir, std::string chanDir)
:p3GroupDistrib(type, cs, cft, srcdir, storedir, chanDir,
CONFIG_TYPE_CHANNELS, CHANNEL_STOREPERIOD, CHANNEL_PUBPERIOD),
CONFIG_TYPE_CHANNELS, CHANNEL_STOREPERIOD, CHANNEL_ARCHIVE_PERIOD,
CHANNEL_PUBPERIOD),
mRsFiles(files),
mChannelsDir(chanDir)
{

View file

@ -95,17 +95,22 @@ GroupInfo::~GroupInfo()
delete it->second ;
}
RsDistribMsgArchive::RsDistribMsgArchive(){
loaded = false;
}
p3GroupDistrib::p3GroupDistrib(uint16_t subtype,
CacheStrapper *cs, CacheTransfer *cft,
std::string sourcedir, std::string storedir,
std::string keyBackUpDir, uint32_t configId,
uint32_t storePeriod, uint32_t pubPeriod)
uint32_t storePeriod, uint32_t archivePeriod, uint32_t pubPeriod)
:CacheSource(subtype, true, cs, sourcedir),
CacheStore(subtype, true, cs, cft, storedir),
p3Config(configId), p3ThreadedService(subtype),
mHistoricalCaches(true),
mStorePeriod(storePeriod),
mArchivePeriod(archivePeriod),
mPubPeriod(pubPeriod),
mLastPublishTime(0),
mMaxCacheSubId(1),
@ -199,12 +204,12 @@ int p3GroupDistrib::tick()
receivePubKeys();
}
// update cache document every 1 minute (5 mins in production)
// update cache document every 1 minute (should be 5 mins in production)
// after historical files have loaded and there is reason to
bool updateCacheDoc = false;
{
RsStackMutex stack(distribMtx);
updateCacheDoc = (now > (time_t) (mLastCacheDocUpdate + 30));
updateCacheDoc = (now > (time_t) (mLastCacheDocUpdate + 10));
updateCacheDoc &= !mHistoricalCaches && mUpdateCacheDoc && mHistoricalCachesLoaded;
#ifdef DISTRIB_HISTORY_DEBUG
std::cerr << "num pending grps: " << mGrpHistPending.size() << std::endl;
@ -213,13 +218,11 @@ int p3GroupDistrib::tick()
#endif
}
#ifdef ENABLE_CACHE_OPT
if(updateCacheDoc){
std::cerr << "count: " << mCount << std::endl;
updateCacheDocument();
}
#endif
return 0;
}
@ -316,7 +319,7 @@ void p3GroupDistrib::updateCacheDocument()
std::vector<grpNodePair> grpNodes;
std::string failedCacheId = FAILED_CACHE_CONT;
// failed cache content node is has not been created add to doc
// failed cache content node has not been created, so add to doc
if(mCacheTable.find(failedCacheId) == mCacheTable.end()){
mCacheDoc.append_child("group");
@ -384,7 +387,7 @@ void p3GroupDistrib::updateCacheDocument()
// add groups to cache table
locked_updateCacheTableGrp(grpNodes, false);
//grpNodeIter.clear();
std::map<std::string, std::set<pCacheId> > msgCacheMap;
pugi::xml_node nodeIter;
@ -402,7 +405,6 @@ void p3GroupDistrib::updateCacheDocument()
pCacheId pCid;
int count = 0;
// int count2 = 0, count3 = 0;
for(; msgIt != mMsgHistPending.end(); msgIt++)
{
@ -415,17 +417,6 @@ void p3GroupDistrib::updateCacheDocument()
pCid = pCacheId(msgIt->second.first,
msgIt->second.second);
// ensure you don't add cache ids twice to same group
// // by checking cache table and current msg additions
// if(nodeCache_iter->second.cIdSet.find(pCid) !=
// nodeCache_iter->second.cIdSet.end())
// count2++;
//
// if(msgCacheMap[msgIt->first].find(pCid) != msgCacheMap[msgIt->first].end())
// count3++;
nodeIter = nodeCache_iter->second.node;
messages_node = nodeIter.child("messages");
@ -446,7 +437,6 @@ void p3GroupDistrib::updateCacheDocument()
// add msg to grp set
msgCacheMap[msgIt->first].insert(pCid);
count++;
}
else{
@ -462,8 +452,6 @@ void p3GroupDistrib::updateCacheDocument()
}
}
// now update cache table by tagging msg cache ids to their
// respective groups
locked_updateCacheTableMsg(msgCacheMap);
@ -493,6 +481,54 @@ void p3GroupDistrib::updateCacheDocument()
return;
}
struct by_cacheid
{
bool operator()(pugi::xml_node node) const
{
bool cachIdEqual = true;
cachIdEqual &= (node.child_value("subId") == subId);
cachIdEqual &= (node.child_value("pId") == peerId);
return cachIdEqual;
}
std::string peerId;
std::string subId;
};
void p3GroupDistrib::locked_removeCacheTableEntry(const pCacheId& pCid)
{
// search through cache document for all entries with this cache id
pugi::xml_node_iterator nit = mCacheDoc.begin();
by_cacheid bCid;
bCid.peerId = pCid.first;
char subIdBuffer[6];
std::string subId;
sprintf(subIdBuffer, "%d", pCid.second);
subId = subIdBuffer;
bCid.subId = subId;
// for each grp, remove message nodes that match pCid
for(; nit != mCacheDoc.end(); nit++)
{
pugi::xml_node msgNode = nit->child("messages");
if(msgNode)
{
while(pugi::xml_node cNode= msgNode.find_child(bCid))
{
msgNode.remove_child(cNode);
}
}
}
locked_buildCacheTable();
return;
}
void p3GroupDistrib::locked_updateCacheTableGrp(const std::vector<grpNodePair>& grpNodes, bool historical)
{
@ -570,6 +606,7 @@ bool p3GroupDistrib::locked_historyCached(const std::string& grpId, bool& cached
std::map<std::string, nodeCache>::iterator cit;
if(mCacheTable.end() != (cit = mCacheTable.find(grpId)))
{
cached = cit->second.cached;
return true;
}
@ -578,6 +615,8 @@ bool p3GroupDistrib::locked_historyCached(const std::string& grpId, bool& cached
return false;
}
bool p3GroupDistrib::locked_historyCached(const pCacheId& cId)
{
@ -605,6 +644,9 @@ bool p3GroupDistrib::locked_buildCacheTable(){
return false;
}
// clear cache table
mCacheTable.clear();
pugi::xml_node_iterator grpIt = mCacheDoc.begin(), msgIt;
pugi::xml_node messages_node;
std::map<std::string, std::set<pCacheId> > msgCacheMap;
@ -662,12 +704,9 @@ bool p3GroupDistrib::locked_buildCacheTable(){
return true;
}
void p3GroupDistrib::locked_processHistoryCached(const std::string& grpId)
void p3GroupDistrib::processHistoryCached(const std::string& grpId)
{
// no processing should be done until cache locations have been stored in memory
if(mHistoricalCaches)
return;
#ifdef DISTRIB_HISTORY_DEBUG
std::cerr << "p3GroupDistrib::locked_processHistoryCached() "
@ -675,25 +714,43 @@ void p3GroupDistrib::locked_processHistoryCached(const std::string& grpId)
#endif
bool cached = true;
locked_historyCached(grpId, cached);
{
RsStackMutex stack(distribMtx);
// no processing should be done until cache locations have been stored in memory
if(mHistoricalCaches)
return;
locked_historyCached(grpId, cached);
}
std::list<CacheData> cDataList;
std::list<CacheData>::iterator cit;
std::string file;
CacheData cDataTemp;
uint16_t cacheType = CacheSource::getCacheType();
// if not history cached then load it
if(!cached)
{
// get list of cache id belonging to grp
locked_getHistoryCacheData(grpId, cDataList);
cit = cDataList.begin();
{
RsStackMutex stack(distribMtx);
locked_getHistoryCacheData(grpId, cDataList);
for(; cit != cDataList.end(); cit++){
cit = cDataList.begin();
for(; cit != cDataList.end(); cit++){
cit->cid.type = cacheType;
locked_getStoredCache(*cit);
}
}
// now load
for(cit = cDataList.begin(); cit != cDataList.end(); cit++){
cit->cid.type = cacheType;
locked_getStoredCache(*cit);
file = cit->path;
file += "/";
file += cit->name;
@ -701,13 +758,16 @@ void p3GroupDistrib::locked_processHistoryCached(const std::string& grpId)
// note: you could load msgs for a cache historied group that is not loaded,
// but any request for info of affected grp will consequently load
// all its msgs through this function anyways
locked_loadFileMsgs(file, cit->cid.subid, cit->pid, cit->recvd, false, true);
loadFileMsgs(file, cit->cid.subid, cit->pid, cit->recvd, (cit->pid == mOwnId), true, true);
}
}
locked_updateCacheTableEntry(grpId, true);
{
RsStackMutex stack(distribMtx);
locked_updateCacheTableEntry(grpId, true);
}
return;
}
@ -768,8 +828,7 @@ void p3GroupDistrib::locked_getHistoryCacheData(const std::string& grpId, std::l
}
else
locked_getStoredCache(cDataTemp);
cDataSet.push_back(cDataTemp);
cDataSet.push_back(cDataTemp);
}
}
else
@ -831,6 +890,168 @@ bool p3GroupDistrib::locked_loadHistoryCacheFile()
return ok;
}
bool p3GroupDistrib::loadArchive(const std::string& grpId)
{
std::string filename;
msgArchMap::iterator it;
{
RsStackMutex stack(distribMtx);
it = mMsgArchive.find(grpId);
if(it != mMsgArchive.end())
{
filename = it->second->msgFilePath;
}
else
{
return false;
}
}
uint32_t bioflags = BIN_FLAGS_HASH_DATA | BIN_FLAGS_READABLE;
uint32_t stream_flags = BIN_FLAGS_READABLE;
BinEncryptedFileInterface *bio = new BinEncryptedFileInterface(filename.c_str(), bioflags);
pqiSSLstore stream(setupSerialiser(), "CONFIG", bio, stream_flags);
std::list<RsItem*> load;
if(!stream.getEncryptedItems(load))
{
#ifdef DISTRIB_ARCH_DEBUG
std::cerr << "p3Distrib::loadArchive() Error occurred trying to msg Archive Item" << std::endl;
#endif
return false;
}
RsStackMutex stack(distribMtx);
if(it->second->msgFileHash != bio->gethash())
{
#ifdef DISTRIB_ARCH_DEBUG
std::cerr << "p3Distrib::loadArchive() Error occurred archived File's Hash invalid" << std::endl;
#endif
return false;
}
std::list<RsItem*>::iterator it = load.begin();
RsDistribSignedMsg* rsdm = NULL;
for(;it!=load.end(); it++)
{
if(NULL != (rsdm = dynamic_cast<RsDistribSignedMsg*>(*it))){
it->second->msgs.push_back(rsdm);
}
else
{
if(*it)
delete *it;
}
}
it->second->loaded = true;
return true;
}
bool p3GroupDistrib::locked_archiveMsg(const std::string& grpId,
RsDistribSignedMsg* msg)
{
// check if msg is within archive period
/* check timestamp */
time_t now = time(NULL);
uint32_t min = now - mArchivePeriod;
uint32_t max = now - mStorePeriod;
if ((msg->timestamp < min) || (msg->timestamp > max))
{
#ifdef DISTRIB_ARCH_DEBUG
std::cerr << "p3Distrib::locked_archiveMsg() Error, Msg to old to Archive " << std::endl;
#endif
return false;
}
msgArchMap::iterator it = mMsgArchive.find(grpId);
// check an entry exists already
if(it != mMsgArchive.end())
{
it->second->msgs.push_back(msg);
it->second->toArchive = true;
}
else // if not then make one
{
RsDistribMsgArchive* msgArch = new RsDistribMsgArchive();
msgArch->msgs.push_back(msg);
msgArch->grpId = grpId;
msgArch->loaded = false;
msgArch->toArchive = true;
mMsgArchive.insert(std::pair<std::string, RsDistribMsgArchive*>(
grpId, msgArch));
}
return true;
}
bool p3GroupDistrib::sendArchiveToFile(RsDistribMsgArchive* msgArch)
{
std::string filename = mKeyBackUpDir + "/grp-" + msgArch->grpId + "-archive.dist";
// encrypted storage
uint32_t bioflags = BIN_FLAGS_HASH_DATA | BIN_FLAGS_WRITEABLE;
uint32_t stream_flags = BIN_FLAGS_WRITEABLE;
stream_flags |= BIN_FLAGS_NO_DELETE;
BinEncryptedFileInterface *bio = new BinEncryptedFileInterface(filename.c_str(), bioflags);
pqiSSLstore *stream = new pqiSSLstore(setupSerialiser(), "CONFIG", bio, stream_flags);
bool written = stream->encryptedSendItems(msgArch->msgs);
msgArch->msgFileHash = bio->gethash();
if(msgArch->msgFileHash.empty())
return false;
msgArch->msgFilePath = filename;
return written;
}
void p3GroupDistrib::archiveRun()
{
// quite expensive
RsStackMutex stack(distribMtx);
msgArchMap::iterator it = mMsgArchive.begin();
// go through and archive all files
for(; it!=mMsgArchive.end(); it++)
{
if(!it->second->toArchive)
{
sendArchiveToFile(it->second);
it->second->toArchive = false;
}
}
// indicate config to save meta data (file location and grpId pair)
IndicateConfigChanged();
return;
}
bool p3GroupDistrib::locked_saveHistoryCacheFile()
{
@ -839,7 +1060,7 @@ bool p3GroupDistrib::locked_saveHistoryCacheFile()
return false;
std::string hFileName = mKeyBackUpDir + "/" + HIST_CACHE_FNAME;
std::ofstream hFile(hFileName.c_str(), std::ios::binary | std::ios::out);
std::ofstream hFile(hFileName.c_str(), std::ios::binary | std::ios::out);
std::ostringstream cacheStream;
char* fileBuffer = NULL;
int streamLength;
@ -1007,7 +1228,7 @@ int p3GroupDistrib::loadAnyCache(const CacheData &data, bool local, bool his
}
else
{
loadFileMsgs(file, data.cid.subid, data.pid, data.recvd, local, historical);
loadFileMsgs(file, data.cid.subid, data.pid, data.recvd, local, historical, false);
}
return true;
@ -1098,7 +1319,8 @@ void p3GroupDistrib::loadFileGroups(const std::string &filename, const std::stri
return;
}
void p3GroupDistrib::loadFileMsgs(const std::string &filename, uint16_t cacheSubId, const std::string &src, uint32_t ts, bool local, bool historical)
void p3GroupDistrib::loadFileMsgs(const std::string &filename, uint16_t cacheSubId, const std::string &src, uint32_t ts, bool local, bool historical,
bool cacheLoad)
{
#ifdef DISTRIB_DEBUG
@ -1109,24 +1331,31 @@ void p3GroupDistrib::loadFileMsgs(const std::string &filename, uint16_t cacheSub
time_t now = time(NULL);
bool cache = false;
#ifdef ENABLE_CACHE_OPT
// if cache id exists in cache table exit
{
RsStackMutex stack(distribMtx);
// if this is a cache load proceed if not check
// cache id exists in cache table, if so don't load
if(locked_historyCached(pCacheId(src, cacheSubId))){
return;
}
else
if(!cacheLoad)
{
cache = true;
if(historical && locked_historyCached(pCacheId(src, cacheSubId)))
{
return;
}
else
{
cache = true;
}
}
}
#endif
// link grp to cache id (only one cache id, so doesn't matter if one grp comes out twice
// with same cache id)
std::map<std::string, pCacheId> msgCacheMap;
// if message loaded before check failed cache
pCacheId failedCache = pCacheId(src, cacheSubId);
/* create the serialiser to load msgs */
BinInterface *bio = new BinFileInterface(filename.c_str(), BIN_FLAGS_READABLE);
@ -1229,133 +1458,6 @@ void p3GroupDistrib::loadFileMsgs(const std::string &filename, uint16_t cacheSub
return;
}
//TODO make carbon copy of sister
void p3GroupDistrib::locked_loadFileMsgs(const std::string &filename, uint16_t cacheSubId, const std::string &src, uint32_t ts, bool local, bool historical)
{
#ifdef DISTRIB_DEBUG
std::cerr << "p3GroupDistrib::loadFileMsgs()";
std::cerr << std::endl;
#endif
time_t now = time(NULL);
bool cache = false;
#ifdef ENABLE_CACHE_OPT
// if cache id exists in cache table exit
if(!historical){
if(locked_historyCached(pCacheId(src, cacheSubId))){
return;
}
else
{
cache = true;
}
}
#endif
// link grp to cache id (only one cache id, so doesn't matter if one grp comes out twice
// with same cache id)
std::map<std::string, pCacheId> msgCacheMap;
pCacheId failedCache = pCacheId(src, cacheSubId);
/* create the serialiser to load msgs */
BinInterface *bio = new BinFileInterface(filename.c_str(), BIN_FLAGS_READABLE);
pqistore *store = createStore(bio, src, BIN_FLAGS_READABLE);
#ifdef DISTRIB_DEBUG
std::cerr << "loading file " << filename << std::endl ;
#endif
RsItem *item;
RsDistribSignedMsg *newMsg;
std::string grpId;
while(NULL != (item = store->GetItem()))
{
#ifdef DISTRIB_DEBUG
std::cerr << "p3GroupDistrib::loadFileMsgs() Got Item:";
std::cerr << std::endl;
item->print(std::cerr, 10);
std::cerr << std::endl;
#endif
if ((newMsg = dynamic_cast<RsDistribSignedMsg *>(item)))
{
grpId = newMsg->grpId;
if(locked_loadMsg(newMsg, src, local, historical))
{
if(cache)
{
msgCacheMap.insert(grpCachePair(grpId, pCacheId(src, cacheSubId)));
}
}
}
else
{
#ifdef DISTRIB_DEBUG
std::cerr << "p3GroupDistrib::loadFileMsgs() Unexpected Item - deleting";
std::cerr << std::endl;
#endif
/* wrong message type */
delete item;
}
}
std::map<std::string, pCacheId>::iterator mit;
if(cache){
mit = msgCacheMap.begin();
for(;mit != msgCacheMap.end(); mit++)
{
mMsgHistPending.push_back(grpCachePair(mit->first, mit->second));
}
mUpdateCacheDoc = true;
if(!msgCacheMap.empty())
mCount++;
std::string failedCacheId = FAILED_CACHE_CONT;
// if msg cache map is empty then cache id failed
if(msgCacheMap.empty())
mMsgHistPending.push_back(grpCachePair(failedCacheId, failedCache));
}
if (local)
{
/* now we create a map of time -> subid
* This is used to determine the newest and the oldest items
*/
#ifdef DISTRIB_DEBUG
std::cerr << "p3GroupDistrib::loadFileMsgs() Updating Local TimeStamps";
std::cerr << std::endl;
std::cerr << "p3GroupDistrib::loadFileMsgs() CacheSubId: " << cacheSubId << " recvd: " << ts;
std::cerr << std::endl;
#endif
mLocalCacheTs[ts] = cacheSubId;
if (cacheSubId > mMaxCacheSubId)
{
#ifdef DISTRIB_DEBUG
std::cerr << "p3GroupDistrib::loadFileMsgs() New Max CacheSubId";
std::cerr << std::endl;
#endif
mMaxCacheSubId = cacheSubId;
}
if (((time_t) ts < now) && ((time_t) ts > mLastPublishTime))
{
#ifdef DISTRIB_DEBUG
std::cerr << "p3GroupDistrib::loadFileMsgs() New LastPublishTime";
std::cerr << std::endl;
#endif
mLastPublishTime = ts;
}
}
delete store;
return;
}
/***************************************************************************************/
/***************************************************************************************/
/********************** load Cache Msgs ***************************************/
@ -1671,9 +1773,15 @@ bool p3GroupDistrib::loadMsg(RsDistribSignedMsg *newMsg, const std::string &src,
std::cerr << "p3GroupDistrib::loadMsg() check failed" << std::endl;
std::cerr << std::endl;
#endif
delete newMsg;
delete msg;
return false;
// out of range, archive if subscribed and if archiving
// successful allow to continue loading
if(!locked_archiveMsg(newMsg->grpId, newMsg)
&& (git->second.flags & RS_DISTRIB_SUBSCRIBED)){
delete newMsg;
delete msg;
return false;
}
}
/* accept message */
@ -1987,6 +2095,9 @@ void p3GroupDistrib::locked_publishPendingMsgs()
newCache.cid.type = CacheSource::getCacheType();
newCache.cid.subid = locked_determineCacheSubId();
// remove old cache entry using this pid
locked_removeCacheTableEntry(pCacheId(newCache.pid, newCache.cid.subid));
/* create filename */
std::string path = CacheSource::getCacheDir();
std::ostringstream out;
@ -2001,6 +2112,11 @@ void p3GroupDistrib::locked_publishPendingMsgs()
bool resave = false;
std::list<RsDistribSignedMsg *>::iterator it;
// for cache opt
std::list<grpCachePair> gcpList;
pCacheId pcId(newCache.pid, newCache.cid.subid);
for(it = mPendingPublish.begin(); it != mPendingPublish.end(); it++)
{
#ifdef DISTRIB_DEBUG
@ -2022,6 +2138,8 @@ void p3GroupDistrib::locked_publishPendingMsgs()
// prevent sending original source of message to peers
(*it)->PeerId(mOwnId);
gcpList.push_back(grpCachePair((*it)->grpId, pcId));
if(!store->SendItem(*it)) /* deletes it */
{
ok &= false;
@ -2060,6 +2178,7 @@ void p3GroupDistrib::locked_publishPendingMsgs()
if(ok)
refreshCache(newCache);
std::list<grpCachePair>::iterator git = gcpList.begin();
if (ok && resave)
{
@ -2069,7 +2188,15 @@ void p3GroupDistrib::locked_publishPendingMsgs()
#endif
/* flag to store config (saying we've published messages) */
IndicateConfigChanged(); /**** INDICATE CONFIG CHANGED! *****/
// add new cache to cache opt doc
for(;git != gcpList.end(); git++)
mMsgHistPending.push_back(*git);
mUpdateCacheDoc = true;
}
}
@ -2299,11 +2426,12 @@ void p3GroupDistrib::getPopularGroupList(uint32_t popMin, uint32_t popMax, std::
bool p3GroupDistrib::getAllMsgList(const std::string& grpId, std::list<std::string> &msgIds)
{
processHistoryCached(grpId);
RsStackMutex stack(distribMtx); /************* STACK MUTEX ************/
#ifdef ENABLE_CACHE_OPT
locked_processHistoryCached(grpId);
#endif
std::map<std::string, GroupInfo>::iterator git;
if (mGroups.end() == (git = mGroups.find(grpId)))
@ -2324,11 +2452,11 @@ bool p3GroupDistrib::getAllMsgList(const std::string& grpId, std::list<std::stri
bool p3GroupDistrib::getParentMsgList(const std::string& grpId, const std::string& pId,
std::list<std::string> &msgIds)
{
processHistoryCached(grpId);
RsStackMutex stack(distribMtx); /************* STACK MUTEX ************/
#ifdef ENABLE_CACHE_OPT
locked_processHistoryCached(grpId);
#endif
std::map<std::string, GroupInfo>::iterator git;
if (mGroups.end() == (git = mGroups.find(grpId)))
@ -2389,7 +2517,8 @@ RsDistribMsg *p3GroupDistrib::locked_getGroupMsg(const std::string& grpId, const
/************* ALREADY LOCKED ************/
locked_processHistoryCached(grpId);
// processHistoryCached(grpId);
std::map<std::string, GroupInfo>::iterator git;
if (mGroups.end() == (git = mGroups.find(grpId)))
@ -2678,6 +2807,20 @@ bool p3GroupDistrib::saveList(bool &cleanup, std::list<RsItem *>& saveData)
}
/* save msg history meta data */
msgArchMap::iterator maIt = mMsgArchive.begin();
for(;maIt != mMsgArchive.end(); maIt++)
{
RsDistribMsgHstry* msgHstry = new RsDistribMsgHstry();
msgHstry->grpId = maIt->first;
msgHstry->msgHstryFileHash = maIt->second->msgFileHash;
msgHstry->msgHstryFilePath = maIt->second->msgFilePath;
saveData.push_back(msgHstry);
saveCleanupList.push_back(msgHstry);
}
std::list<RsItem *> childSaveL = childSaveList();
std::list<RsItem *>::iterator cit = childSaveL.begin();
RsSerialType *childSer = createSerialiser();
@ -2700,9 +2843,7 @@ bool p3GroupDistrib::saveList(bool &cleanup, std::list<RsItem *>& saveData)
delete childSer;
// now save hostory doc
#ifdef ENABLE_CACHE_OPT
locked_saveHistoryCacheFile();
#endif
return true;
}
@ -2726,14 +2867,12 @@ bool p3GroupDistrib::loadList(std::list<RsItem *>& load)
{
std::list<RsItem *>::iterator lit;
#ifdef ENABLE_CACHE_OPT
{
RsStackMutex stack(distribMtx);
if(locked_loadHistoryCacheFile())
locked_buildCacheTable();
}
#endif
/* for child config data */
std::list<RsItem* > childLoadL;
@ -2748,6 +2887,7 @@ bool p3GroupDistrib::loadList(std::list<RsItem *>& load)
RsDistribGrpKey *newKey = NULL;
RsDistribSignedMsg *newMsg = NULL;
RsDistribConfigData* newChildConfig = NULL;
RsDistribMsgHstry* msgHstry = NULL;
if ((newGrp = dynamic_cast<RsDistribGrp *>(*lit)))
@ -2755,7 +2895,6 @@ bool p3GroupDistrib::loadList(std::list<RsItem *>& load)
const std::string &gid = newGrp -> grpId;
if(loadGroup(newGrp, false)){
#ifdef ENABLE_CACHE_OPT
bool cached = false;
RsStackMutex stack(distribMtx);
@ -2765,7 +2904,6 @@ bool p3GroupDistrib::loadList(std::list<RsItem *>& load)
mGrpHistPending.push_back(gcPair);
mUpdateCacheDoc = true;
}
#endif
}
subscribeToGroup(gid, true);
}
@ -2790,6 +2928,24 @@ bool p3GroupDistrib::loadList(std::list<RsItem *>& load)
newMsg->PeerId(mOwnId);
loadMsg(newMsg, mOwnId, false, false); /* false so it'll pushed to PendingPublish list */
}
else if ((msgHstry = dynamic_cast<RsDistribMsgHstry*>(*lit)))
{
RsDistribMsgArchive* msgArch = new RsDistribMsgArchive();
msgArch->grpId = msgHstry->grpId;
msgArch->loaded = false;
msgArch->msgFileHash = msgHstry->msgHstryFileHash;
msgArch->msgFilePath = msgHstry->msgHstryFilePath;
msgArch->toArchive = false;
{
RsStackMutex stack(distribMtx);
mMsgArchive.insert(std::pair<std::string, RsDistribMsgArchive*>(
msgArch->grpId, msgArch));
}
delete msgHstry;
}
else if ((newChildConfig = dynamic_cast<RsDistribConfigData *>(*lit)))
{
RsItem* childConfigItem = childSer->deserialise(newChildConfig->service_data.bin_data,
@ -2798,6 +2954,7 @@ bool p3GroupDistrib::loadList(std::list<RsItem *>& load)
childLoadL.push_back(childConfigItem);
}
}
/* no need to republish until something new comes in */
@ -5092,17 +5249,15 @@ bool p3GroupDistrib::locked_printDummyMsgs(GroupInfo &grp)
bool p3GroupDistrib::getDummyParentMsgList(const std::string& grpId, const std::string& pId, std::list<std::string> &msgIds)
{
// load grp from history cache if not already loaded
processHistoryCached(grpId);
#ifdef DISTRIB_DUMMYMSG_DEBUG
std::cerr << "p3GroupDistrib::getDummyParentMsgList(grpId:" << grpId << "," << pId << ")";
std::cerr << std::endl;
#endif
RsStackMutex stack(distribMtx); /************* STACK MUTEX ************/
// load grp from history cache if not already loaded
#ifdef ENABLE_CACHE_OPT
locked_processHistoryCached(grpId);
#endif
std::map<std::string, GroupInfo>::iterator git;
if (mGroups.end() == (git = mGroups.find(grpId)))
{
@ -5133,7 +5288,9 @@ bool p3GroupDistrib::getDummyParentMsgList(const std::string& grpId, const std::
RsDistribDummyMsg *p3GroupDistrib::locked_getGroupDummyMsg(const std::string& grpId, const std::string& msgId)
{
locked_processHistoryCached(grpId);
// processHistoryCached(grpId);
#ifdef DISTRIB_DUMMYMSG_DEBUG
std::cerr << "p3GroupDistrib::locked_getGroupDummyMsg(grpId:" << grpId << "," << msgId << ")";
std::cerr << std::endl;

View file

@ -250,6 +250,7 @@ typedef std::pair<std::string, pugi::xml_node > grpNodePair; // (is loaded, iter
// these make up a cache list
typedef std::pair<std::string, uint16_t> pCacheId; //(pid, subid)
typedef std::pair<std::string, pCacheId> grpCachePair; // (grpid, cid)
typedef std::map<std::string, RsDistribMsgArchive* > msgArchMap;
/*!
* grp node content for faster access
@ -309,15 +310,43 @@ class CacheDataPending
bool mHistorical;
};
class RsDistribMsgArchive
{
public:
RsDistribMsgArchive();
std::list<RsDistribSignedMsg*> msgs;
std::string grpId;
std::string msgFileHash;
std::string msgFilePath;
bool loaded;
bool toArchive;
};
class p3GroupDistrib: public CacheSource, public CacheStore, public p3Config, public p3ThreadedService
{
public:
p3GroupDistrib(uint16_t subtype,
/*!
*
* @param subtype service type
* @param cs handle to cache strapper
* @param cft handle to cache transfer, required to correctly initialise p3GroupDistrib
* @param sourcedir directory for remote cache files
* @param storedir directory for local cache files
* @param keyBackUpDir when key back function invoked, keys are stored here
* @param configId
* @param storePeriod how long local msgs are kept for
* @param archivePeriod how long archived msgs are kept for
* @param pubPeriod length of time interval before pending msgs/grps are published
*/
p3GroupDistrib(uint16_t subtype,
CacheStrapper *cs, CacheTransfer *cft,
std::string sourcedir, std::string storedir, std::string keyBackUpDir,
uint32_t configId,
uint32_t storePeriod, uint32_t pubPeriod);
uint32_t storePeriod, uint32_t archivePeriod, uint32_t pubPeriod);
virtual ~p3GroupDistrib() ;
@ -394,12 +423,6 @@ class p3GroupDistrib: public CacheSource, public CacheStore, public p3Config, pu
*/
bool locked_buildCacheTable(void);
/*!
* if grp's message is not loaded, load it, and update cache table
* @param grpId group whose messages to load if not cached
*/
void locked_processHistoryCached(const std::string& grpId);
/*!
* loads cache data which contains location of cache files belonging
@ -419,6 +442,47 @@ class p3GroupDistrib: public CacheSource, public CacheStore, public p3Config, pu
*/
bool locked_loadHistoryCacheFile();
/*!
* this removes the given cache id and associated msgs nodes from
* all grp nodes
* cache table is updated to reflect document
* this costly, and is here to be called once a year has been reached on
* @param pCid the cache id to remove from cache document
*/
void locked_removeCacheTableEntry(const pCacheId& pCid);
/*!
*
* @param grpId
* @param msg
* @return
*/
bool locked_archiveMsg(const std::string& grpId, RsDistribSignedMsg* msg);
/*!
*
* @param grpId archive msgs to load
* @return false if there are no archived msgs
*/
bool loadArchive(const std::string& grpId);
/*!
* the hash and path for msgArch is set here
* do not call frequently expensive IO
* @param msgArch the archive to send to file
* @return if archiving to file succeeded
*/
bool sendArchiveToFile(RsDistribMsgArchive* msgArch);
/*!
* to be called, preferably in periods, archives flagged
* to be sent to file will be archived and
* IndicateConfigChanged is called to save
* archive file locations
*/
void archiveRun();
private:
/* these lists are filled by the overloaded fns... then cleared by the thread */
@ -430,8 +494,18 @@ class p3GroupDistrib: public CacheSource, public CacheStore, public p3Config, pu
/* load cache files */
void loadFileGroups(const std::string &filename, const std::string &src, bool local, bool historical, const pCacheId& cid);
void loadFileMsgs(const std::string &filename, uint16_t cacheSubId, const std::string &src, uint32_t ts, bool local, bool historical);
void locked_loadFileMsgs(const std::string &filename, uint16_t cacheSubId, const std::string &src, uint32_t ts, bool local, bool historical);
/*!
* @param filename absolute cache file path
* @param cacheSubId cache subid, needed to save cache to history file
* @param src peer src id
* @param ts timestamp
* @param local set to whether it islocal or remote cache
* @param historical set to whether it is an old cache
* @param cacheLoad is a history cache opt load, prevent adding to cache history again
*/
void loadFileMsgs(const std::string &filename, uint16_t cacheSubId, const std::string &src, uint32_t ts,
bool local, bool historical, bool cacheLoad);
bool backUpKeys(const std::list<RsDistribGrpKey* > &keysToBackUp, std::string grpId);
void locked_sharePubKey();
@ -478,6 +552,11 @@ class p3GroupDistrib: public CacheSource, public CacheStore, public p3Config, pu
bool loadGroupKey(RsDistribGrpKey *newKey, bool historical);
/*!
* if grp's message is not loaded, load it, and update cache table
* @param grpId group whose messages to load if not cached
*/
void processHistoryCached(const std::string& grpId);
/***************************************************************************************/
/***************************************************************************************/
@ -658,6 +737,9 @@ class p3GroupDistrib: public CacheSource, public CacheStore, public p3Config, pu
void locked_publishPendingMsgs();
/*!
* This function is key to determining how long caches permeate
* a distributed network, after mStorePeriod has elapsed for a message
* it is over written since its cache subid is used for the cache file name
* @return cache sub id
*/
uint16_t locked_determineCacheSubId();
@ -851,7 +933,7 @@ RsDistribDummyMsg *locked_getGroupDummyMsg(const std::string& grpId, const std::
std::list<GroupCache> mLocalCaches;
std::map<std::string, GroupInfo> mGroups;
uint32_t mStorePeriod, mPubPeriod;
uint32_t mStorePeriod, mPubPeriod, mArchivePeriod;
/* Message Publishing */
std::list<RsDistribSignedMsg *> mPendingPublish;
@ -888,11 +970,14 @@ RsDistribDummyMsg *locked_getGroupDummyMsg(const std::string& grpId, const std::
time_t mLastCacheDocUpdate;
bool mUpdateCacheDoc, mHistoricalCachesLoaded;
std::map<std::string, nodeCache> mCacheTable; // (cid, node)
/// contains information on cached data
pugi::xml_document mCacheDoc;
/* msg archiving */
msgArchMap mMsgArchive;
};

View file

@ -75,13 +75,15 @@ RsForums *rsForums = NULL;
/* Forums will be initially stored for 1 year
* remember 2^16 = 64K max units in store period.
* PUBPERIOD * 2^16 = max STORE PERIOD */
#define FORUM_STOREPERIOD (365*24*3600) /* 365 * 24 * 3600 - secs in a year */
#define FORUM_PUBPERIOD 600 /* 10 minutes ... (max = 455 days) */
#define FORUM_STOREPERIOD (60*24*3600) /* 60 * 24 * 3600 - secs in two months */
#define FORUM_PUBPERIOD 30 /* 10 minutes ... (max = 455 days) */
#define FORUM_ARCHIVE_PERIOD (365*24*3600) /* 365 * 24 * 3600 - secs in a year */
p3Forums::p3Forums(uint16_t type, CacheStrapper *cs, CacheTransfer *cft,
std::string srcdir, std::string storedir, std::string forumDir)
:p3GroupDistrib(type, cs, cft, srcdir, storedir, forumDir,
CONFIG_TYPE_FORUMS, FORUM_STOREPERIOD, FORUM_PUBPERIOD),
CONFIG_TYPE_FORUMS, FORUM_STOREPERIOD, FORUM_ARCHIVE_PERIOD,
FORUM_PUBPERIOD),
mForumsDir(forumDir)
{
@ -294,6 +296,7 @@ bool p3Forums::getForumThreadMsgList(const std::string &fId, const std::string &
bool p3Forums::getForumMessage(const std::string &fId, const std::string &mId, ForumMsgInfo &info)
{
processHistoryCached(fId);
RsStackMutex stack(distribMtx); /***** STACK LOCKED MUTEX *****/
RsDistribMsg *msg = locked_getGroupMsg(fId, mId);
@ -550,7 +553,7 @@ bool p3Forums::getMessageCount(const std::string &fId, unsigned int &newCount, u
if (grpFlags & (RS_DISTRIB_ADMIN | RS_DISTRIB_SUBSCRIBED)) {
std::list<std::string> msgIds;
if (getAllMsgList(fId, msgIds)) {
if (getAllMsgList(fId, msgIds)) { // get msg ids without causing a costly cache load
RsStackMutex stack(distribMtx); /***** STACK LOCKED MUTEX *****/

View file

@ -11,7 +11,7 @@ RCC_DIR = temp/qrc
UI_DIR = temp/ui
MOC_DIR = temp/moc
#CONFIG += debug
CONFIG += debug
debug {
QMAKE_CFLAGS += -g
}

View file

@ -134,6 +134,8 @@ ForumsDialog::ForumsDialog(QWidget *parent)
m_bProcessSettings = false;
subscribeFlags = 0;
mFillthreadCount = 0;
connect( ui.forumTreeWidget, SIGNAL( treeCustomContextMenuRequested( QPoint ) ), this, SLOT( forumListCustomPopupMenu( QPoint ) ) );
connect( ui.threadTreeWidget, SIGNAL( customContextMenuRequested( QPoint ) ), this, SLOT( threadListCustomPopupMenu( QPoint ) ) );
@ -465,7 +467,9 @@ void ForumsDialog::updateDisplay()
{
std::list<std::string> forumIds;
std::list<std::string>::iterator it;
if (!rsForums)
// suspend access to forum while thread is running
if (!rsForums || (mFillthreadCount != 0))
return;
if (rsForums->forumsChanged(forumIds))
@ -770,9 +774,11 @@ void ForumsDialog::fillThreadFinished()
thread = NULL;
}
mFillthreadCount -= 1;
#ifdef DEBUG_FORUMS
std::cerr << "ForumsDialog::fillThreadFinished done" << std::endl;
#endif
}
void ForumsDialog::fillThreadProgress(int current, int count)
@ -866,6 +872,8 @@ void ForumsDialog::insertThreads()
std::cerr << "ForumsDialog::insertThreads() Start fill thread" << std::endl;
#endif
mFillthreadCount +=1;
// start thread
fillThread->start();
}

View file

@ -136,6 +136,7 @@ private:
QFont m_ForumNameFont;
int lastViewType;
std::string lastForumID;
int mFillthreadCount;
ForumsFillThread *fillThread;