diff --git a/libretroshare/src/gxs/rsdataservice.cc b/libretroshare/src/gxs/rsdataservice.cc index 47ff70b10..14a200ccc 100644 --- a/libretroshare/src/gxs/rsdataservice.cc +++ b/libretroshare/src/gxs/rsdataservice.cc @@ -26,6 +26,8 @@ #include +#include +#include #include "rsdataservice.h" @@ -166,6 +168,10 @@ RsDataService::RsDataService(const std::string &serviceDir, const std::string &d // for retrieving actual grp data grpColumns.push_back(KEY_GRP_ID); grpColumns.push_back(KEY_NXS_FILE); grpColumns.push_back(KEY_NXS_FILE_OFFSET); grpColumns.push_back(KEY_NXS_FILE_LEN); grpColumns.push_back(KEY_NXS_META); + + // for retrieving msg offsets + mMsgOffSetColumns.push_back(KEY_MSG_ID); mMsgOffSetColumns.push_back(KEY_NXS_FILE_OFFSET); + mMsgOffSetColumns.push_back(KEY_NXS_FILE_LEN); } RsDataService::~RsDataService(){ @@ -1020,24 +1026,216 @@ int RsDataService::removeGroups(const std::vector &grpIds) int RsDataService::updateGroupMetaData(GrpLocMetaData &meta) { + RsStackMutex stack(mDbMutex); RsGxsGroupId& grpId = meta.grpId; return mDb->sqlUpdate(GRP_TABLE_NAME, KEY_GRP_ID+ "='" + grpId + "'", meta.val) ? 1 : 0; } int RsDataService::updateMessageMetaData(MsgLocMetaData &metaData) { + RsStackMutex stack(mDbMutex); RsGxsGroupId& grpId = metaData.msgId.first; RsGxsMessageId& msgId = metaData.msgId.second; return mDb->sqlUpdate(MSG_TABLE_NAME, KEY_GRP_ID+ "='" + grpId + "' AND " + KEY_MSG_ID + "='" + msgId + "'", metaData.val) ? 1 : 0; } +MsgOffset offSetAccum(const MsgOffset& x, const MsgOffset& y) +{ + MsgOffset m; + m.msgLen = y.msgLen + x.msgLen; + return m; +} int RsDataService::removeMsgs(const GxsMsgReq& msgIds) { - return 0; + RsStackMutex stack(mDbMutex); + + // for each group + // first get all message meta, get symmetric difference of message in vector + // build a pair of start and end points to copy into the buffer + + GxsMsgReq::const_iterator mit = msgIds.begin(); + + + for(; mit != msgIds.end(); mit++) + { + MsgUpdates updates; + const std::vector& msgIdV = mit->second; + const RsGxsGroupId& grpId = mit->first; + + GxsMsgReq reqIds; + reqIds.insert(std::make_pair(grpId, std::vector() )); + + // can get offsets for each file + std::vector msgOffsets; + getMessageOffsets(grpId, msgOffsets); + + std::string oldFileName = mServiceDir + "/" + grpId + "-msgs"; + std::string newFileName = mServiceDir + "/" + grpId + "-msgs-temp"; + std::ifstream in(oldFileName.c_str(), std::ios::binary); + std::vector dataBuff, newBuffer; + + std::vector::iterator vit; + + uint32_t maxSize = 0; + for(; vit != msgOffsets.end(); vit++) + maxSize += vit->msgLen; + + + dataBuff.resize(maxSize); + newBuffer.resize(maxSize); + + dataBuff.insert(dataBuff.end(), + std::istreambuf_iterator(in), + std::istreambuf_iterator()); + + in.close(); + + + for(std::vector::size_type i = 0; msgOffsets.size(); i++) + { + const MsgOffset& m = msgOffsets[i]; + + uint32_t newOffset = 0; + if(std::find(msgIdV.begin(), msgIdV.end(), m.msgId) == msgIdV.end()) + { + MsgUpdate up; + + uint32_t msgLen = m.msgLen; + + up.msgId = m.msgId; + up.cv.put(KEY_NXS_FILE_OFFSET, (int32_t)msgLen); + + newBuffer.insert(dataBuff.end(), dataBuff.begin()+m.msgOffset, + dataBuff.begin()+m.msgOffset+m.msgLen); + + newOffset += msgLen; + + up.cv.put(KEY_NXS_FILE_LEN, (int32_t)newOffset); + + // add msg update + updates[grpId].push_back(up); + } + } + + std::ofstream out(newFileName.c_str(), std::ios::binary); + + std::copy(newBuffer.begin(), newBuffer.end(), + std::ostreambuf_iterator(out)); + + out.close(); + + // now update the new positions in db + updateMessageEntries(updates); + + // then delete removed messages + GxsMsgReq msgsToDelete; + msgsToDelete[grpId] = msgIdV; + removeMessageEntries(msgsToDelete); + + // now replace old file location with new file + remove(oldFileName.c_str()); + RsDirUtil::renameFile(newFileName, oldFileName); + } + + return 1; } + + +bool RsDataService::updateMessageEntries(const MsgUpdates& updates) +{ + // start a transaction + bool ret = mDb->execSQL("BEGIN;"); + + MsgUpdates::const_iterator mit = updates.begin(); + + for(; mit != updates.end(); mit++) + { + + const RsGxsGroupId& grpId = mit->first; + const std::vector& updateV = mit->second; + std::vector::const_iterator vit = updateV.begin(); + + for(; vit != updateV.end(); vit++) + { + const MsgUpdate& update = *vit; + mDb->sqlUpdate(MSG_TABLE_NAME, KEY_GRP_ID+ "='" + grpId + + "' AND " + KEY_MSG_ID + "='" + update.msgId + "'", update.cv); + } + } + + ret &= mDb->execSQL("COMMIT;"); + + return ret; +} + +bool RsDataService::removeMessageEntries(const GxsMsgReq& msgIds) +{ + // start a transaction + bool ret = mDb->execSQL("BEGIN;"); + + GxsMsgReq::const_iterator mit = msgIds.begin(); + + for(; mit != msgIds.end(); mit++) + { + const RsGxsGroupId& grpId = mit->first; + const std::vector& msgsV = mit->second; + std::vector::const_iterator vit = msgsV.begin(); + + for(; vit != msgsV.end(); vit++) + { + const RsGxsMessageId& msgId = *vit; + mDb->sqlDelete(MSG_TABLE_NAME, KEY_GRP_ID+ "='" + grpId + + "' AND " + KEY_MSG_ID + "='" + msgId + "'", ""); + } + } + + ret &= mDb->execSQL("COMMIT;"); + + return ret; +} + +void RsDataService::getMessageOffsets(const RsGxsGroupId& grpId, std::vector& offsets) +{ + + RetroCursor* c = mDb->sqlQuery(MSG_TABLE_NAME, mMsgOffSetColumns, KEY_GRP_ID+ "='" + grpId + "'", ""); + + if(c) + { + bool valid = c->moveToFirst(); + + while(valid) + { + RsGxsMessageId msgId; + int32_t msgLen; + int32_t msgOffSet; + c->getString(0, msgId); + msgLen = c->getInt32(1); + msgOffSet = c->getInt32(2); + + MsgOffset offset; + offset.msgId = msgId; + offset.msgLen = msgLen; + offset.msgOffset = msgOffSet; + offsets.push_back(offset); + + valid = c->moveToNext(); + } + delete c; + } +} + +void RsDataService::lockStore() +{ + mDbMutex.lock(); +} + +void RsDataService::unlockStore() +{ + mDbMutex.unlock(); +} uint32_t RsDataService::cacheSize() const { return 0; } diff --git a/libretroshare/src/gxs/rsdataservice.h b/libretroshare/src/gxs/rsdataservice.h index 8bfe97856..cf19bd07e 100644 --- a/libretroshare/src/gxs/rsdataservice.h +++ b/libretroshare/src/gxs/rsdataservice.h @@ -29,6 +29,24 @@ #include "gxs/rsgds.h" #include "util/retrodb.h" +class MsgOffset +{ +public: + + MsgOffset() : msgOffset(0), msgLen(0) {} + RsGxsMessageId msgId; + uint32_t msgOffset, msgLen; +}; + +class MsgUpdate +{ +public: + + MsgUpdate(){} + MsgUpdate(const MsgUpdate& ){} + RsGxsMessageId msgId; + ContentValue cv; +}; class RsDataService : public RsGeneralDataService { @@ -123,6 +141,18 @@ public: */ int updateGroupMetaData(GrpLocMetaData& meta); + /*! + * Use to lock store when needing to ensure Db contents has not change + * @warning ensure you call unlock or you could cause a deadlock + * @see RsDataService::unlockStore() + */ + void lockStore(); + + /*! + * Use to unlock store after locking + * @see RsDataService::lockStore() + */ + void unlockStore(); /*! * Completely clear out data stored in @@ -132,7 +162,6 @@ public: */ int resetDataStore(); - bool validSize(RsNxsMsg* msg) const; bool validSize(RsNxsGrp* grp) const; @@ -190,6 +219,26 @@ private: */ void initialise(); + /*! + * Remove entries for data base + * @param msgIds + */ + bool removeMessageEntries(const GxsMsgReq& msgIds); + + typedef std::map > MsgUpdates; + + /*! + * Update messages entries with new values + * @param msgIds + * @param cv contains values to update message entries with + */ + bool updateMessageEntries(const MsgUpdates& updates); + + +private: + + void getMessageOffsets(const RsGxsGroupId& grpId, std::vector& msgOffsets); + private: @@ -199,6 +248,7 @@ private: std::list msgColumns; std::list msgMetaColumns; + std::list mMsgOffSetColumns; std::list grpColumns; std::list grpMetaColumns; diff --git a/libretroshare/src/gxs/rsgenexchange.cc b/libretroshare/src/gxs/rsgenexchange.cc index 4c5692979..071a08cdc 100644 --- a/libretroshare/src/gxs/rsgenexchange.cc +++ b/libretroshare/src/gxs/rsgenexchange.cc @@ -55,9 +55,10 @@ #define GEN_EXCH_DEBUG 1 RsGenExchange::RsGenExchange(RsGeneralDataService *gds, RsNetworkExchangeService *ns, - RsSerialType *serviceSerialiser, uint16_t servType, RsGixs* gixs, uint32_t authenPolicy) + RsSerialType *serviceSerialiser, uint16_t servType, RsGixs* gixs, + uint32_t authenPolicy, uint32_t messageStorePeriod) : mGenMtx("GenExchange"), mDataStore(gds), mNetService(ns), mSerialiser(serviceSerialiser), - mServType(servType), mGixs(gixs), mAuthenPolicy(authenPolicy), + mServType(servType), mGixs(gixs), mAuthenPolicy(authenPolicy), MESSAGE_STORE_PERIOD(messageStorePeriod), CREATE_FAIL(0), CREATE_SUCCESS(1), CREATE_FAIL_TRY_LATER(2), SIGN_MAX_ATTEMPTS(5), SIGN_FAIL(0), SIGN_SUCCESS(1), SIGN_FAIL_TRY_LATER(2), VALIDATE_FAIL(0), VALIDATE_SUCCESS(1), VALIDATE_FAIL_TRY_LATER(2), VALIDATE_MAX_ATTEMPTS(5) diff --git a/libretroshare/src/gxs/rsgenexchange.h b/libretroshare/src/gxs/rsgenexchange.h index 69331fecc..e402f46a0 100644 --- a/libretroshare/src/gxs/rsgenexchange.h +++ b/libretroshare/src/gxs/rsgenexchange.h @@ -38,6 +38,8 @@ #include "retroshare/rsgxsservice.h" #include "serialiser/rsnxsitems.h" +#define DEFAULT_MSG_STORE_PERIOD 60*60 // 1 hour + template class GxsPendingItem { @@ -125,7 +127,8 @@ public: * @param authenPolicy This determines the authentication used for verfying authorship of msgs and groups */ RsGenExchange(RsGeneralDataService* gds, RsNetworkExchangeService* ns, - RsSerialType* serviceSerialiser, uint16_t mServType, RsGixs* gixs = NULL, uint32_t authenPolicy = 0); + RsSerialType* serviceSerialiser, uint16_t mServType, RsGixs* gixs = NULL, uint32_t authenPolicy = 0, + uint32_t messageStorePeriod = DEFAULT_MSG_STORE_PERIOD); virtual ~RsGenExchange(); @@ -693,6 +696,7 @@ private: typedef std::vector > NxsMsgPendingVect; + const uint32_t MESSAGE_STORE_PERIOD; private: diff --git a/libretroshare/src/gxs/rsgxsutil.cc b/libretroshare/src/gxs/rsgxsutil.cc new file mode 100644 index 000000000..2e26387f8 --- /dev/null +++ b/libretroshare/src/gxs/rsgxsutil.cc @@ -0,0 +1,92 @@ +/* + * libretroshare/src/gxs: rsgxsutil.cc + * + * RetroShare C++ Interface. Generic routines that are useful in GXS + * + * Copyright 2013-2013 by Christopher Evi-Parker + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License Version 2 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 + * USA. + * + * Please report all bugs and problems to "retroshare@lunamutt.com". + * + */ + +#include "rsgxsutil.h" + + +RsGxsMessageCleanUp::RsGxsMessageCleanUp(RsGeneralDataService* const dataService, uint32_t messageStorePeriod, uint32_t chunkSize) +: mDs(dataService), MESSAGE_STORE_PERIOD(messageStorePeriod), CHUNK_SIZE(chunkSize) +{ + + std::map grpMeta; + mDs->retrieveGxsGrpMetaData(grpMeta); + + std::map::iterator cit = grpMeta.begin(); + + for(;cit != grpMeta.end(); cit++) + { + mGrpIds.push_back(cit->first); + delete cit->second; + } +} + + +bool RsGxsMessageCleanUp::clean() +{ + int i = 0; + + time_t now = time(NULL); + + while(!mGrpIds.empty()) + { + + RsGxsGroupId grpId = mGrpIds.back(); + mGrpIds.pop_back(); + GxsMsgReq req; + GxsMsgMetaResult result; + + result[grpId] = std::vector(); + mDs->retrieveGxsMsgMetaData(req, result); + + GxsMsgMetaResult::iterator mit = result.begin(); + + req.clear(); + + for(; mit != result.end(); mit++) + { + std::vector& metaV = mit->second; + std::vector::iterator vit = metaV.begin(); + + for(; vit != metaV.end(); ) + { + RsGxsMsgMetaData* meta = *vit; + if(meta->mPublishTs + MESSAGE_STORE_PERIOD > now) + { + req[grpId].push_back(meta->mMsgId); + } + + delete meta; + vit = metaV.erase(vit); + } + } + + mDs->removeMsgs(req); + + i++; + if(i > CHUNK_SIZE) break; + } + + return mGrpIds.empty(); +} diff --git a/libretroshare/src/gxs/rsgxsutil.h b/libretroshare/src/gxs/rsgxsutil.h index aa76af55a..18c4a663c 100644 --- a/libretroshare/src/gxs/rsgxsutil.h +++ b/libretroshare/src/gxs/rsgxsutil.h @@ -27,6 +27,8 @@ #define GXSUTIL_H_ #include +#include "serialiser/rsnxsitems.h" +#include "rsgds.h" /*! * Handy function for cleaning out meta result containers @@ -55,5 +57,41 @@ inline RsGxsGrpMsgIdPair getMsgIdPair(RsGxsMsgItem& msg) return RsGxsGrpMsgIdPair(std::make_pair(msg.meta.mGroupId, msg.meta.mMsgId)); } +/*! + * Does message clean up based on individual group expirations first + * if avialable. If not then deletion s + */ +class RsGxsMessageCleanUp : public RsThread +{ +public: + + /*! + * + * @param dataService + * @param mGroupTS + * @param chunkSize + * @param sleepPeriod + */ + RsGxsMessageCleanUp(RsGeneralDataService* const dataService, uint32_t messageStorePeriod, uint32_t chunkSize); + + /*! + * On construction this should be called to progress deletions + * Deletion will process by chunk size + * @return true if no more messages to delete, false otherwise + */ + bool clean(); + + /*! + * TODO: Rather manual progressions consider running through a thread + */ + void run(){} + +private: + + RsGeneralDataService* const mDs; + const uint32_t MESSAGE_STORE_PERIOD, CHUNK_SIZE; + std::vector mGrpIds; +}; + #endif /* GXSUTIL_H_ */ diff --git a/libretroshare/src/libretroshare.pro b/libretroshare/src/libretroshare.pro index 335bc4593..a21b1f202 100644 --- a/libretroshare/src/libretroshare.pro +++ b/libretroshare/src/libretroshare.pro @@ -651,6 +651,7 @@ gxs { util/contentvalue.cc \ gxs/gxssecurity.cc \ gxs/gxstokenqueue.cc \ + gxs/rsgxsutil.cc # Identity Service HEADERS += retroshare/rsidentity.h \