working version of msg archiving and 2 month store period

- TODO: write test for new rsitem (msghstry) and save archive msg metadata to config
 in case of premature rs shutdown

git-svn-id: http://svn.code.sf.net/p/retroshare/code/branches/v0.5-msghistory@4501 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
chrisparker126 2011-07-30 16:58:39 +00:00
parent d0d13e718f
commit 836ca5017b
4 changed files with 142 additions and 67 deletions

View File

@ -237,6 +237,17 @@ std::ostream &RsDistribGrpKey::print(std::ostream &out, uint16_t indent)
return out;
}
void RsDistribMsgHstry::clear()
{
grpId.clear();
msgHstryFileHash.clear();
msgHstryFilePath.clear();
}
std::ostream& RsDistribMsgHstry::print(std::ostream& out, uint16_t indent)
{
return out;
}
/*************************************************************************/
/*************************************************************************/
@ -1050,7 +1061,7 @@ bool RsDistribSerialiser::serialise(RsItem *i, void *data, uint32_t *pktsize
{
return serialiseConfigData(dsd, data, pktsize);
}
else if(NULL != (dsd = dynamic_cast<RsDistribConfigData *>(i)))
else if(NULL != (dmh = dynamic_cast<RsDistribMsgHstry *>(i)))
{
return serialiseMsgHstry(dmh, data, pktsize);
}

View File

@ -217,10 +217,6 @@ class RsDistribMsgHstry: public RsItem
{
public:
RsDistribMsgHstry(uint16_t service_type)
:RsItem(RS_PKT_VERSION_SERVICE, service_type, RS_PKT_SUBTYPE_DISTRIB_MSG_HSTRY)
{ return; }
RsDistribMsgHstry()
:RsItem(RS_PKT_VERSION_SERVICE, RS_SERVICE_TYPE_DISTRIB, RS_PKT_SUBTYPE_DISTRIB_MSG_HSTRY)
{ return; }

View File

@ -121,11 +121,13 @@ p3GroupDistrib::p3GroupDistrib(uint16_t subtype,
mGroupsChanged = true;
mCount = 0;
mLastCacheDocUpdate = time(NULL);
mLastArchivePeriod = time(NULL);
mHistoricalCachesLoaded = false;
mUpdateArchive = false;
mOptPeriod = 15; // 30 seconds to run optimisation functions
mOwnId = AuthSSL::getAuthSSL()->OwnId();
mOwnId = AuthSSL::getAuthSSL()->OwnId();
addSerialType(new RsDistribSerialiser(getRsItemService(getType())));
addSerialType(new RsDistribSerialiser(getRsItemService(getType())));
return;
}
@ -204,12 +206,24 @@ int p3GroupDistrib::tick()
receivePubKeys();
}
bool updateArchive = false;
{
RsStackMutex stack(distribMtx);
updateArchive = (now > (time_t) (mLastArchivePeriod + mOptPeriod));
updateArchive &= mUpdateArchive;
}
if(updateArchive){
archiveRun();
mLastArchivePeriod = now;
}
// update cache document every 1 minute (should be 5 mins in production)
// after historical files have loaded and there is reason to
bool updateCacheDoc = false;
{
RsStackMutex stack(distribMtx);
updateCacheDoc = (now > (time_t) (mLastCacheDocUpdate + 10));
updateCacheDoc = (now > (time_t) (mLastCacheDocUpdate + mOptPeriod));
updateCacheDoc &= !mHistoricalCaches && mUpdateCacheDoc && mHistoricalCachesLoaded;
#ifdef DISTRIB_HISTORY_DEBUG
std::cerr << "num pending grps: " << mGrpHistPending.size() << std::endl;
@ -404,8 +418,6 @@ void p3GroupDistrib::updateCacheDocument()
pugi::xml_node messages_node;
pCacheId pCid;
int count = 0;
for(; msgIt != mMsgHistPending.end(); msgIt++)
{
@ -766,8 +778,11 @@ void p3GroupDistrib::processHistoryCached(const std::string& grpId)
{
RsStackMutex stack(distribMtx);
locked_updateCacheTableEntry(grpId, true);
}
loadArchive(grpId);
return;
}
@ -896,12 +911,21 @@ bool p3GroupDistrib::loadArchive(const std::string& grpId)
std::string filename;
msgArchMap::iterator mit;
// get archive file name (absolute)
{
RsStackMutex stack(distribMtx);
mit = mMsgArchive.find(grpId);
if(mit != mMsgArchive.end())
{
if(mit->second->loaded)
{
#ifdef DISTRIB_ARCH_DEBUG
std::cerr << "p3Distrib::loadArchive() Error, Archive already loaded" << std::endl;
#endif
return false;
}
filename = mit->second->msgFilePath;
}
else
@ -910,6 +934,8 @@ bool p3GroupDistrib::loadArchive(const std::string& grpId)
}
}
// retrieve archived msgs from file
uint32_t bioflags = BIN_FLAGS_HASH_DATA | BIN_FLAGS_READABLE;
uint32_t stream_flags = BIN_FLAGS_READABLE;
@ -921,13 +947,14 @@ bool p3GroupDistrib::loadArchive(const std::string& grpId)
if(!stream.getEncryptedItems(load))
{
#ifdef DISTRIB_ARCH_DEBUG
std::cerr << "p3Distrib::loadArchive() Error occurred trying to msg Archive Item" << std::endl;
std::cerr << "p3Distrib::loadArchive() Error occurred trying to load msg Archive Item" << std::endl;
#endif
return false;
}
RsStackMutex stack(distribMtx);
// check the file hash matches with recorded hash
if(mit->second->msgFileHash != bio->gethash())
{
#ifdef DISTRIB_ARCH_DEBUG
@ -939,10 +966,13 @@ bool p3GroupDistrib::loadArchive(const std::string& grpId)
std::list<RsItem*>::iterator it = load.begin();
RsDistribSignedMsg* rsdm = NULL;
// load into archive
for(;it!=load.end(); it++)
{
if(NULL != (rsdm = dynamic_cast<RsDistribSignedMsg*>(*it))){
mit->second->msgs.push_back(rsdm);
mit->second->msgs.insert(msgPair(rsdm->msgId,rsdm));
locked_loadMsg(rsdm, rsdm->PeerId(), true, true, true);
}
else
{
@ -951,6 +981,7 @@ bool p3GroupDistrib::loadArchive(const std::string& grpId)
}
}
// mark grp's archived msgs as loaded
mit->second->loaded = true;
return true;
@ -966,12 +997,11 @@ bool p3GroupDistrib::locked_archiveMsg(const std::string& grpId,
/* check timestamp */
time_t now = time(NULL);
uint32_t min = now - mArchivePeriod;
uint32_t max = now - mStorePeriod;
if ((msg->timestamp < min) || (msg->timestamp > max))
if (msg->timestamp < min)
{
#ifdef DISTRIB_ARCH_DEBUG
std::cerr << "p3Distrib::locked_archiveMsg() Error, Msg to old to Archive " << std::endl;
std::cerr << "p3Distrib::locked_archiveMsg() Error, Msg too old to Archive " << std::endl;
#endif
return false;
@ -982,13 +1012,21 @@ bool p3GroupDistrib::locked_archiveMsg(const std::string& grpId,
// check an entry exists already
if(it != mMsgArchive.end())
{
it->second->msgs.push_back(msg);
msgPairMap::iterator ms_it = it->second->msgs.find(msg->msgId);
if(ms_it == it->second->msgs.end()){
#ifdef DISTRIB_ARCH_DEBUG
std::cerr << "p3Distrib::locked_archiveMsg() Error, Msg already archived " << std::endl;
#endif
return false;
}
it->second->msgs.insert(msgPair(msg->msgId, msg));
it->second->toArchive = true;
}
else // if not then make one
{
RsDistribMsgArchive* msgArch = new RsDistribMsgArchive();
msgArch->msgs.push_back(msg);
msgArch->msgs.insert(msgPair(msg->msgId, msg));
msgArch->grpId = grpId;
msgArch->loaded = false;
msgArch->toArchive = true;
@ -996,6 +1034,8 @@ bool p3GroupDistrib::locked_archiveMsg(const std::string& grpId,
grpId, msgArch));
}
mUpdateArchive = true;
return true;
}
@ -1011,16 +1051,16 @@ bool p3GroupDistrib::sendArchiveToFile(RsDistribMsgArchive* msgArch)
stream_flags |= BIN_FLAGS_NO_DELETE;
BinEncryptedFileInterface *bio = new BinEncryptedFileInterface(filename.c_str(), bioflags);
pqiSSLstore *stream = new pqiSSLstore(setupSerialiser(), "CONFIG", bio, stream_flags);
pqiSSLstore stream(setupSerialiser(), "CONFIG", bio, stream_flags);
std::list<RsItem*> sendList;
std::list<RsDistribSignedMsg*>::iterator it =
msgPairMap::iterator mit =
msgArch->msgs.begin();
for(; it!=msgArch->msgs.end(); it++)
sendList.push_back(*it);
for(; mit!=msgArch->msgs.end(); mit++)
sendList.push_back(mit->second);
bool written = stream->encryptedSendItems(sendList);
bool written = stream.encryptedSendItems(sendList);
msgArch->msgFileHash = bio->gethash();
@ -1045,7 +1085,7 @@ void p3GroupDistrib::archiveRun()
for(; it!=mMsgArchive.end(); it++)
{
if(!it->second->toArchive)
if(it->second->toArchive)
{
sendArchiveToFile(it->second);
it->second->toArchive = false;
@ -1388,7 +1428,7 @@ void p3GroupDistrib::loadFileMsgs(const std::string &filename, uint16_t cacheSub
if ((newMsg = dynamic_cast<RsDistribSignedMsg *>(item)))
{
grpId = newMsg->grpId;
if(loadMsg(newMsg, src, local, historical))
if(loadMsg(newMsg, src, local, historical, false))
{
if(cache)
{
@ -1671,7 +1711,7 @@ bool p3GroupDistrib::loadGroupKey(RsDistribGrpKey *newKey, bool historical)
}
bool p3GroupDistrib::loadMsg(RsDistribSignedMsg *newMsg, const std::string &src, bool local, bool historical)
bool p3GroupDistrib::loadMsg(RsDistribSignedMsg *newMsg, const std::string &src, bool local, bool historical, bool archive)
{
/****************** check the msg ******************/
/* Do the most likely checks to fail first....
@ -1774,7 +1814,11 @@ bool p3GroupDistrib::loadMsg(RsDistribSignedMsg *newMsg, const std::string &src,
return false;
}
if (!locked_checkDistribMsg(git->second, msg))
bool archived = false;
// checks timestamp of msg to see if it is valid
// ignore this if this is an archived msg load
if (!locked_checkDistribMsg(git->second, msg) && !archive)
{
#ifdef DISTRIB_DEBUG
std::cerr << "p3GroupDistrib::loadMsg() check failed" << std::endl;
@ -1783,8 +1827,11 @@ bool p3GroupDistrib::loadMsg(RsDistribSignedMsg *newMsg, const std::string &src,
// out of range, archive if subscribed and if archiving
// successful allow to continue loading
if(!locked_archiveMsg(newMsg->grpId, newMsg)
&& (git->second.flags & RS_DISTRIB_SUBSCRIBED)){
if(git->second.flags & RS_DISTRIB_SUBSCRIBED)
archived = locked_archiveMsg(newMsg->grpId, newMsg);
if(!archived)
{
delete newMsg;
delete msg;
return false;
@ -1839,7 +1886,9 @@ bool p3GroupDistrib::loadMsg(RsDistribSignedMsg *newMsg, const std::string &src,
std::cerr << "p3GroupDistrib::loadMsg() Deleted Original Msg (No Publish)";
std::cerr << std::endl;
#endif
delete newMsg;
if(!archived)
delete newMsg;
}
@ -1849,7 +1898,7 @@ bool p3GroupDistrib::loadMsg(RsDistribSignedMsg *newMsg, const std::string &src,
return true;
}
bool p3GroupDistrib::locked_loadMsg(RsDistribSignedMsg *newMsg, const std::string &src, bool local, bool historical)
bool p3GroupDistrib::locked_loadMsg(RsDistribSignedMsg *newMsg, const std::string &src, bool local, bool historical, bool archive)
{
/****************** check the msg ******************/
/* Do the most likely checks to fail first....
@ -1869,7 +1918,6 @@ bool p3GroupDistrib::locked_loadMsg(RsDistribSignedMsg *newMsg, const std::strin
std::cerr << "----------------------" << std::endl;
#endif
/* Check if it exists already */
/* find group */
@ -1885,7 +1933,6 @@ bool p3GroupDistrib::locked_loadMsg(RsDistribSignedMsg *newMsg, const std::strin
return false;
}
/****************** check the msg ******************/
/* check for duplicate message, do this first to ensure minimal signature validations.
* therefore, duplicateMsg... could potentially be called on a dodgey msg (not a big problem!)
@ -1916,26 +1963,24 @@ bool p3GroupDistrib::locked_loadMsg(RsDistribSignedMsg *newMsg, const std::strin
return false;
}
void *temp_ptr = newMsg->packet.bin_data;
int temp_len = newMsg->packet.bin_len;
void *temp_ptr = newMsg->packet.bin_data;
int temp_len = newMsg->packet.bin_len;
if(git->second.grpFlags & RS_DISTRIB_ENCRYPTED){
void *out_data = NULL;
int out_len = 0;
void *out_data = NULL;
int out_len = 0;
if(decrypt(out_data, out_len, newMsg->packet.bin_data, newMsg->packet.bin_len, newMsg->grpId)){
newMsg->packet.TlvShallowClear();
newMsg->packet.setBinData(out_data, out_len);
delete[] (unsigned char*) out_data;
if(decrypt(out_data, out_len, newMsg->packet.bin_data, newMsg->packet.bin_len, newMsg->grpId)){
newMsg->packet.TlvShallowClear();
newMsg->packet.setBinData(out_data, out_len);
delete[] (unsigned char*) out_data;
}else{
if((out_data != NULL) && (out_len != 0))
delete[] (unsigned char*) out_data;
if((out_data != NULL) && (out_len != 0))
delete[] (unsigned char*) out_data;
#ifdef DISTRIB_DEBUG
std::cerr << "p3GroupDistrib::loadMsg() Failed to decrypt" << std::endl;
std::cerr << std::endl;
std::cerr << "p3GroupDistrib::loadMsg() Failed to decrypt" << std::endl;
std::cerr << std::endl;
#endif
return false;
}
@ -1954,15 +1999,28 @@ bool p3GroupDistrib::locked_loadMsg(RsDistribSignedMsg *newMsg, const std::strin
return false;
}
if (!locked_checkDistribMsg(git->second, msg))
bool archived = false;
// checks timestamp of msg to see if it is valid
// ignore this if this is an archived msg load
if (!locked_checkDistribMsg(git->second, msg) && !archive)
{
#ifdef DISTRIB_DEBUG
std::cerr << "p3GroupDistrib::loadMsg() check failed" << std::endl;
std::cerr << std::endl;
#endif
delete newMsg;
delete msg;
return false;
// out of range, archive if subscribed and if archiving
// successful allow to continue loading
if(git->second.flags & RS_DISTRIB_SUBSCRIBED)
archived = locked_archiveMsg(newMsg->grpId, newMsg);
if(!archived)
{
delete newMsg;
delete msg;
return false;
}
}
/* accept message */
@ -1983,7 +2041,6 @@ bool p3GroupDistrib::locked_loadMsg(RsDistribSignedMsg *newMsg, const std::strin
std::cerr << std::endl;
#endif
/* Callback for any derived classes to play with */
locked_eventNewMsg(&(git->second), msg, src, historical);
@ -1996,10 +2053,10 @@ bool p3GroupDistrib::locked_loadMsg(RsDistribSignedMsg *newMsg, const std::strin
std::cerr << std::endl;
#endif
if(git->second.grpFlags & RS_DISTRIB_ENCRYPTED){
newMsg->packet.TlvClear();
newMsg->packet.setBinData(temp_ptr, temp_len);
}
if(git->second.grpFlags & RS_DISTRIB_ENCRYPTED){
newMsg->packet.TlvClear();
newMsg->packet.setBinData(temp_ptr, temp_len);
}
locked_toPublishMsg(newMsg);
}
@ -2014,7 +2071,9 @@ bool p3GroupDistrib::locked_loadMsg(RsDistribSignedMsg *newMsg, const std::strin
std::cerr << "p3GroupDistrib::loadMsg() Deleted Original Msg (No Publish)";
std::cerr << std::endl;
#endif
delete newMsg;
if(!archived)
delete newMsg;
}
@ -2933,7 +2992,7 @@ bool p3GroupDistrib::loadList(std::list<RsItem *>& load)
else if ((newMsg = dynamic_cast<RsDistribSignedMsg *>(*lit)))
{
newMsg->PeerId(mOwnId);
loadMsg(newMsg, mOwnId, false, false); /* false so it'll pushed to PendingPublish list */
loadMsg(newMsg, mOwnId, false, false, false); /* false so it'll pushed to PendingPublish list */
}
else if ((msgHstry = dynamic_cast<RsDistribMsgHstry*>(*lit)))
{
@ -3762,7 +3821,7 @@ std::string p3GroupDistrib::publishMsg(RsDistribMsg *msg, bool personalSign)
*/
signedMsg->PeerId(mOwnId);
loadMsg(signedMsg, mOwnId, false, false);
loadMsg(signedMsg, mOwnId, false, false, false);
/* done */
return msgId;

View File

@ -313,13 +313,16 @@ class CacheDataPending
bool mHistorical;
};
typedef std::pair<std::string, RsDistribSignedMsg*> msgPair;
typedef std::map<std::string, RsDistribSignedMsg*> msgPairMap;
class RsDistribMsgArchive
{
public:
RsDistribMsgArchive();
std::list<RsDistribSignedMsg*> msgs;
std::map<std::string , RsDistribSignedMsg*> msgs;
std::string grpId;
std::string msgFileHash;
std::string msgFilePath;
@ -437,11 +440,13 @@ class p3GroupDistrib: public CacheSource, public CacheStore, public p3Config, pu
/*!
* encrypts and saves cache file
* @return false if cache file does not save
*/
bool locked_saveHistoryCacheFile();
/*!
* decrypte and save cache file
* decrypts and loads cache file
* @return false if loading fails
*/
bool locked_loadHistoryCacheFile();
@ -455,7 +460,7 @@ class p3GroupDistrib: public CacheSource, public CacheStore, public p3Config, pu
void locked_removeCacheTableEntry(const pCacheId& pCid);
/*!
*
* archives a msg for given group Id
* @param grpId
* @param msg
* @return
@ -463,8 +468,8 @@ class p3GroupDistrib: public CacheSource, public CacheStore, public p3Config, pu
bool locked_archiveMsg(const std::string& grpId, RsDistribSignedMsg* msg);
/*!
*
* @param grpId archive msgs to load
* loads archived msg for a group
* @param grpId the grpId of group to load archive msgs for
* @return false if there are no archived msgs
*/
bool loadArchive(const std::string& grpId);
@ -528,8 +533,10 @@ class p3GroupDistrib: public CacheSource, public CacheStore, public p3Config, pu
* @param msg msg to loaded
* @param src src of msg (peer id)
* @param local is this a local cache msg (your msg)
* @param historical
* @param archive
*/
bool loadMsg(RsDistribSignedMsg *msg, const std::string &src, bool local, bool historical);
bool loadMsg(RsDistribSignedMsg *msg, const std::string &src, bool local, bool historical, bool archive = false);
/*!
* msg is loaded to its group and republished,
@ -538,7 +545,7 @@ class p3GroupDistrib: public CacheSource, public CacheStore, public p3Config, pu
* @param src src of msg (peer id)
* @param local is this a local cache msg (your msg)
*/
bool locked_loadMsg(RsDistribSignedMsg *newMsg, const std::string &src, bool local, bool historical);
bool locked_loadMsg(RsDistribSignedMsg *newMsg, const std::string &src, bool local, bool historical, bool archive = false);
/*!
* adds newgrp to grp set, GroupInfo type created and stored
@ -936,7 +943,7 @@ RsDistribDummyMsg *locked_getGroupDummyMsg(const std::string& grpId, const std::
std::list<GroupCache> mLocalCaches;
std::map<std::string, GroupInfo> mGroups;
uint32_t mStorePeriod, mPubPeriod, mArchivePeriod;
uint32_t mStorePeriod, mPubPeriod, mArchivePeriod, mOptPeriod;
/* Message Publishing */
std::list<RsDistribSignedMsg *> mPendingPublish;
@ -981,6 +988,8 @@ RsDistribDummyMsg *locked_getGroupDummyMsg(const std::string& grpId, const std::
/* msg archiving */
msgArchMap mMsgArchive;
uint32_t mLastArchivePeriod;
bool mUpdateArchive;
};