fixes for message clean-up, code now working.

can be enabled with #define GXS_CLEAN_UP, default message store period set
to 1 day, cleaned every 3 minutes. Does not comply fully with spec yet (does not use msg not delete flag)


git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@6267 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
chrisparker126 2013-03-21 23:17:24 +00:00
parent 263c15d6bd
commit 3ed2e471b6
4 changed files with 52 additions and 15 deletions

View file

@ -1052,8 +1052,14 @@ int RsDataService::removeMsgs(const GxsMsgReq& msgIds)
RsStackMutex stack(mDbMutex); RsStackMutex stack(mDbMutex);
// for each group // for each group
// first get all message meta, get symmetric difference of message in vector // get for all msgs their offsets and lengths
// build a pair of start and end points to copy into the buffer // for message not contained in msg id vector
// store their data file segments in buffer
// then recalculate the retained messages
// new offsets, update db with new offsets
// replace old msg file with new file
// remove messages that were not retained from
// db
GxsMsgReq::const_iterator mit = msgIds.begin(); GxsMsgReq::const_iterator mit = msgIds.begin();
@ -1076,13 +1082,14 @@ int RsDataService::removeMsgs(const GxsMsgReq& msgIds)
std::ifstream in(oldFileName.c_str(), std::ios::binary); std::ifstream in(oldFileName.c_str(), std::ios::binary);
std::vector<char> dataBuff, newBuffer; std::vector<char> dataBuff, newBuffer;
std::vector<MsgOffset>::iterator vit; std::vector<MsgOffset>::iterator vit = msgOffsets.begin();
uint32_t maxSize = 0; uint32_t maxSize = 0;
for(; vit != msgOffsets.end(); vit++) for(; vit != msgOffsets.end(); vit++)
maxSize += vit->msgLen; maxSize += vit->msgLen;
// may be preferable to determine file len reality
// from file? corrupt db?
dataBuff.resize(maxSize); dataBuff.resize(maxSize);
newBuffer.resize(maxSize); newBuffer.resize(maxSize);
@ -1092,8 +1099,7 @@ int RsDataService::removeMsgs(const GxsMsgReq& msgIds)
in.close(); in.close();
for(std::vector<MsgOffset>::size_type i = 0; i < msgOffsets.size(); i++)
for(std::vector<MsgOffset>::size_type i = 0; msgOffsets.size(); i++)
{ {
const MsgOffset& m = msgOffsets[i]; const MsgOffset& m = msgOffsets[i];
@ -1105,14 +1111,14 @@ int RsDataService::removeMsgs(const GxsMsgReq& msgIds)
uint32_t msgLen = m.msgLen; uint32_t msgLen = m.msgLen;
up.msgId = m.msgId; up.msgId = m.msgId;
up.cv.put(KEY_NXS_FILE_OFFSET, (int32_t)msgLen); up.cv.put(KEY_NXS_FILE_OFFSET, (int32_t)newOffset);
newBuffer.insert(dataBuff.end(), dataBuff.begin()+m.msgOffset, newBuffer.insert(dataBuff.end(), dataBuff.begin()+m.msgOffset,
dataBuff.begin()+m.msgOffset+m.msgLen); dataBuff.begin()+m.msgOffset+m.msgLen);
newOffset += msgLen; newOffset += msgLen;
up.cv.put(KEY_NXS_FILE_LEN, (int32_t)newOffset); up.cv.put(KEY_NXS_FILE_LEN, (int32_t)msgLen);
// add msg update // add msg update
updates[grpId].push_back(up); updates[grpId].push_back(up);
@ -1212,8 +1218,8 @@ void RsDataService::getMessageOffsets(const RsGxsGroupId& grpId, std::vector<Msg
int32_t msgLen; int32_t msgLen;
int32_t msgOffSet; int32_t msgOffSet;
c->getString(0, msgId); c->getString(0, msgId);
msgLen = c->getInt32(1); msgOffSet = c->getInt32(1);
msgOffSet = c->getInt32(2); msgLen = c->getInt32(2);
MsgOffset offset; MsgOffset offset;
offset.msgId = msgId; offset.msgId = msgId;

View file

@ -54,6 +54,8 @@
#define GEN_EXCH_DEBUG 1 #define GEN_EXCH_DEBUG 1
#define MSG_CLEANUP_PERIOD 60*3 // 3 minutes
RsGenExchange::RsGenExchange(RsGeneralDataService *gds, RsNetworkExchangeService *ns, RsGenExchange::RsGenExchange(RsGeneralDataService *gds, RsNetworkExchangeService *ns,
RsSerialType *serviceSerialiser, uint16_t servType, RsGixs* gixs, RsSerialType *serviceSerialiser, uint16_t servType, RsGixs* gixs,
uint32_t authenPolicy, uint32_t messageStorePeriod) uint32_t authenPolicy, uint32_t messageStorePeriod)
@ -61,7 +63,8 @@ RsGenExchange::RsGenExchange(RsGeneralDataService *gds, RsNetworkExchangeService
mServType(servType), mGixs(gixs), mAuthenPolicy(authenPolicy), MESSAGE_STORE_PERIOD(messageStorePeriod), mServType(servType), mGixs(gixs), mAuthenPolicy(authenPolicy), MESSAGE_STORE_PERIOD(messageStorePeriod),
CREATE_FAIL(0), CREATE_SUCCESS(1), CREATE_FAIL_TRY_LATER(2), SIGN_MAX_ATTEMPTS(5), CREATE_FAIL(0), CREATE_SUCCESS(1), CREATE_FAIL_TRY_LATER(2), SIGN_MAX_ATTEMPTS(5),
SIGN_FAIL(0), SIGN_SUCCESS(1), SIGN_FAIL_TRY_LATER(2), SIGN_FAIL(0), SIGN_SUCCESS(1), SIGN_FAIL_TRY_LATER(2),
VALIDATE_FAIL(0), VALIDATE_SUCCESS(1), VALIDATE_FAIL_TRY_LATER(2), VALIDATE_MAX_ATTEMPTS(5) VALIDATE_FAIL(0), VALIDATE_SUCCESS(1), VALIDATE_FAIL_TRY_LATER(2), VALIDATE_MAX_ATTEMPTS(5),
mCleaning(false), mLastClean(time(NULL)), mMsgCleanUp(NULL)
{ {
mDataAccess = new RsGxsDataAccess(gds); mDataAccess = new RsGxsDataAccess(gds);
@ -122,6 +125,30 @@ void RsGenExchange::tick()
// implemented service tick function // implemented service tick function
service_tick(); service_tick();
#ifdef GXS_CLEAN_UP
time_t now = time(NULL);
if((mLastClean + MSG_CLEANUP_PERIOD < now) || mCleaning)
{
if(mMsgCleanUp)
{
if(mMsgCleanUp->clean())
{
mCleaning = false;
delete mMsgCleanUp;
mMsgCleanUp = NULL;
mLastClean = time(NULL);
}
}else
{
mMsgCleanUp = new RsGxsMessageCleanUp(mDataStore, MESSAGE_STORE_PERIOD, 1);
mCleaning = true;
}
}
#endif
} }
bool RsGenExchange::acknowledgeTokenMsg(const uint32_t& token, bool RsGenExchange::acknowledgeTokenMsg(const uint32_t& token,

View file

@ -37,8 +37,9 @@
#include "rsnxsobserver.h" #include "rsnxsobserver.h"
#include "retroshare/rsgxsservice.h" #include "retroshare/rsgxsservice.h"
#include "serialiser/rsnxsitems.h" #include "serialiser/rsnxsitems.h"
#include "rsgxsutil.h"
#define DEFAULT_MSG_STORE_PERIOD 60*60 // 1 hour #define DEFAULT_MSG_STORE_PERIOD 60*60*24 // 1 day
template<class GxsItem, typename Identity = std::string> template<class GxsItem, typename Identity = std::string>
class GxsPendingItem class GxsPendingItem
@ -698,6 +699,10 @@ private:
const uint32_t MESSAGE_STORE_PERIOD; const uint32_t MESSAGE_STORE_PERIOD;
bool mCleaning;
time_t mLastClean;
RsGxsMessageCleanUp* mMsgCleanUp;
private: private:
std::vector<RsGxsNotify*> mChanges; std::vector<RsGxsNotify*> mChanges;

View file

@ -51,13 +51,12 @@ bool RsGxsMessageCleanUp::clean()
while(!mGrpIds.empty()) while(!mGrpIds.empty())
{ {
RsGxsGroupId grpId = mGrpIds.back(); RsGxsGroupId grpId = mGrpIds.back();
mGrpIds.pop_back(); mGrpIds.pop_back();
GxsMsgReq req; GxsMsgReq req;
GxsMsgMetaResult result; GxsMsgMetaResult result;
result[grpId] = std::vector<RsGxsMsgMetaData*>(); req[grpId] = std::vector<RsGxsMessageId>();
mDs->retrieveGxsMsgMetaData(req, result); mDs->retrieveGxsMsgMetaData(req, result);
GxsMsgMetaResult::iterator mit = result.begin(); GxsMsgMetaResult::iterator mit = result.begin();
@ -72,7 +71,7 @@ bool RsGxsMessageCleanUp::clean()
for(; vit != metaV.end(); ) for(; vit != metaV.end(); )
{ {
RsGxsMsgMetaData* meta = *vit; RsGxsMsgMetaData* meta = *vit;
if(meta->mPublishTs + MESSAGE_STORE_PERIOD > now) if(meta->mPublishTs + MESSAGE_STORE_PERIOD < now)
{ {
req[grpId].push_back(meta->mMsgId); req[grpId].push_back(meta->mMsgId);
} }