diff --git a/libretroshare/src/gxs/rsgenexchange.cc b/libretroshare/src/gxs/rsgenexchange.cc index fab30783c..0f983081c 100644 --- a/libretroshare/src/gxs/rsgenexchange.cc +++ b/libretroshare/src/gxs/rsgenexchange.cc @@ -77,10 +77,10 @@ RsGenExchange::RsGenExchange(RsGeneralDataService *gds, RsNetworkExchangeService mAuthenPolicy(authenPolicy), MESSAGE_STORE_PERIOD(messageStorePeriod), mCleaning(false), - mLastClean(RSRandom::random_u32() % INTEGRITY_CHECK_PERIOD), // this helps unsynchronising the checks for the different services + mLastClean((int)time(NULL) - (int)(RSRandom::random_u32() % MSG_CLEANUP_PERIOD)), // this helps unsynchronising the checks for the different services mMsgCleanUp(NULL), mChecking(false), - mLastCheck(RSRandom::random_u32() % INTEGRITY_CHECK_PERIOD), // this helps unsynchronising the checks for the different services + mLastCheck((int)time(NULL) - (int)(RSRandom::random_u32() % INTEGRITY_CHECK_PERIOD)), // this helps unsynchronising the checks for the different services mIntegrityCheck(NULL), CREATE_FAIL(0), CREATE_SUCCESS(1), @@ -196,7 +196,7 @@ void RsGenExchange::tick() service_tick(); time_t now = time(NULL); - + if((mLastClean + MSG_CLEANUP_PERIOD < now) || mCleaning) { if(mMsgCleanUp) diff --git a/libretroshare/src/gxs/rsgxsutil.cc b/libretroshare/src/gxs/rsgxsutil.cc index 3ee2de1ad..1dd5842ea 100644 --- a/libretroshare/src/gxs/rsgxsutil.cc +++ b/libretroshare/src/gxs/rsgxsutil.cc @@ -27,9 +27,13 @@ #include "rsgxsutil.h" #include "retroshare/rsgxsflags.h" +#include "retroshare/rspeers.h" #include "pqi/pqihash.h" #include "gxs/rsgixs.h" +static const uint32_t MAX_GXS_IDS_REQUESTS_NET = 10 ; // max number of requests from cache/net (avoids killing the system!) + +//#define GXSUTIL_DEBUG 1 RsGxsMessageCleanUp::RsGxsMessageCleanUp(RsGeneralDataService* const dataService, uint32_t messageStorePeriod, uint32_t chunkSize) : mDs(dataService), MESSAGE_STORE_PERIOD(messageStorePeriod), CHUNK_SIZE(chunkSize) @@ -118,140 +122,194 @@ void RsGxsIntegrityCheck::run() bool RsGxsIntegrityCheck::check() { + // first take out all the groups + std::map grp; + mDs->retrieveNxsGrps(grp, true, true); + std::vector grpsToDel; + GxsMsgReq msgIds; + GxsMsgReq grps; - // first take out all the groups - std::map grp; - mDs->retrieveNxsGrps(grp, true, true); - std::vector grpsToDel; - GxsMsgReq msgIds; - GxsMsgReq grps; + std::set used_gxs_ids ; + std::set subscribed_groups ; - std::set subscribed_groups ; + // compute hash and compare to stored value, if it fails then simply add it + // to list + std::map::iterator git = grp.begin(); + for(; git != grp.end(); ++git) + { + RsNxsGrp* grp = git->second; + RsFileHash currHash; + pqihash pHash; + pHash.addData(grp->grp.bin_data, grp->grp.bin_len); + pHash.Complete(currHash); - // compute hash and compare to stored value, if it fails then simply add it - // to list - std::map::iterator git = grp.begin(); - for(; git != grp.end(); ++git) - { - RsNxsGrp* grp = git->second; - RsFileHash currHash; - pqihash pHash; - pHash.addData(grp->grp.bin_data, grp->grp.bin_len); - pHash.Complete(currHash); + if(currHash == grp->metaData->mHash) + { + // get all message ids of group + if (mDs->retrieveMsgIds(grp->grpId, msgIds[grp->grpId]) == 1) + { + // store the group for retrieveNxsMsgs + grps[grp->grpId]; - if(currHash == grp->metaData->mHash) - { - // get all message ids of group - if (mDs->retrieveMsgIds(grp->grpId, msgIds[grp->grpId]) == 1) - { - // store the group for retrieveNxsMsgs - grps[grp->grpId]; + if(grp->metaData->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED) + { + subscribed_groups.insert(git->first) ; - if(grp->metaData->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED) - { - subscribed_groups.insert(git->first) ; + if(!grp->metaData->mAuthorId.isNull()) + { +#ifdef GXSUTIL_DEBUG + std::cerr << "TimeStamping group authors' key ID " << grp->metaData->mAuthorId << " in group ID " << grp->grpId << std::endl; +#endif - if(!grp->metaData->mAuthorId.isNull()) - { - std::cerr << "TimeStamping group authors' key ID " << grp->metaData->mAuthorId << " in group ID " << grp->metaData->mAuthorId << std::endl; - mGixs->timeStampKey(grp->metaData->mAuthorId) ; - } - } - } - else - { - msgIds.erase(msgIds.find(grp->grpId)); - // grpsToDel.push_back(grp->grpId); - } + used_gxs_ids.insert(grp->metaData->mAuthorId) ; + } + } + } + else + { + msgIds.erase(msgIds.find(grp->grpId)); + // grpsToDel.push_back(grp->grpId); + } - } - else - { - grpsToDel.push_back(grp->grpId); - } - delete grp; - } + } + else + { + grpsToDel.push_back(grp->grpId); + } + delete grp; + } - mDs->removeGroups(grpsToDel); + mDs->removeGroups(grpsToDel); - // now messages - GxsMsgReq msgsToDel; - GxsMsgResult msgs; + // now messages + GxsMsgReq msgsToDel; + GxsMsgResult msgs; - mDs->retrieveNxsMsgs(grps, msgs, false, true); + mDs->retrieveNxsMsgs(grps, msgs, false, true); - // check msg ids and messages - GxsMsgReq::iterator msgIdsIt; - for (msgIdsIt = msgIds.begin(); msgIdsIt != msgIds.end(); ++msgIdsIt) - { - const RsGxsGroupId& grpId = msgIdsIt->first; - std::vector &msgIdV = msgIdsIt->second; + // check msg ids and messages + GxsMsgReq::iterator msgIdsIt; + for (msgIdsIt = msgIds.begin(); msgIdsIt != msgIds.end(); ++msgIdsIt) + { + const RsGxsGroupId& grpId = msgIdsIt->first; + std::vector &msgIdV = msgIdsIt->second; - std::vector::iterator msgIdIt; - for (msgIdIt = msgIdV.begin(); msgIdIt != msgIdV.end(); ++msgIdIt) - { - const RsGxsMessageId& msgId = *msgIdIt; - std::vector &nxsMsgV = msgs[grpId]; + std::vector::iterator msgIdIt; + for (msgIdIt = msgIdV.begin(); msgIdIt != msgIdV.end(); ++msgIdIt) + { + const RsGxsMessageId& msgId = *msgIdIt; + std::vector &nxsMsgV = msgs[grpId]; - std::vector::iterator nxsMsgIt; - for (nxsMsgIt = nxsMsgV.begin(); nxsMsgIt != nxsMsgV.end(); ++nxsMsgIt) - { - RsNxsMsg *nxsMsg = *nxsMsgIt; - if (nxsMsg && msgId == nxsMsg->msgId) - { - break; - } - } + std::vector::iterator nxsMsgIt; + for (nxsMsgIt = nxsMsgV.begin(); nxsMsgIt != nxsMsgV.end(); ++nxsMsgIt) + { + RsNxsMsg *nxsMsg = *nxsMsgIt; + if (nxsMsg && msgId == nxsMsg->msgId) + { + break; + } + } - if (nxsMsgIt == nxsMsgV.end()) - { - msgsToDel[grpId].push_back(msgId); - } - } - } + if (nxsMsgIt == nxsMsgV.end()) + { + msgsToDel[grpId].push_back(msgId); + } + } + } - GxsMsgResult::iterator mit = msgs.begin(); + GxsMsgResult::iterator mit = msgs.begin(); - for(; mit != msgs.end(); ++mit) - { - std::vector& msgV = mit->second; - std::vector::iterator vit = msgV.begin(); + for(; mit != msgs.end(); ++mit) + { + std::vector& msgV = mit->second; + std::vector::iterator vit = msgV.begin(); - for(; vit != msgV.end(); ++vit) - { - RsNxsMsg* msg = *vit; - RsFileHash currHash; - pqihash pHash; - pHash.addData(msg->msg.bin_data, msg->msg.bin_len); - pHash.Complete(currHash); + for(; vit != msgV.end(); ++vit) + { + RsNxsMsg* msg = *vit; + RsFileHash currHash; + pqihash pHash; + pHash.addData(msg->msg.bin_data, msg->msg.bin_len); + pHash.Complete(currHash); - if(msg->metaData == NULL || currHash != msg->metaData->mHash) - { - std::cerr << "(EE) deleting message data with wrong hash or null meta data. meta=" << (void*)msg->metaData << std::endl; - msgsToDel[msg->grpId].push_back(msg->msgId); - } - else if(!msg->metaData->mAuthorId.isNull() && subscribed_groups.find(msg->metaData->mGroupId)!=subscribed_groups.end()) - { - std::cerr << "TimeStamping message authors' key ID " << msg->metaData->mAuthorId << " in message " << msg->msgId << ", group ID " << msg->grpId<< std::endl; - mGixs->timeStampKey(msg->metaData->mAuthorId) ; - } + if(msg->metaData == NULL || currHash != msg->metaData->mHash) + { + std::cerr << "(EE) deleting message data with wrong hash or null meta data. meta=" << (void*)msg->metaData << std::endl; + msgsToDel[msg->grpId].push_back(msg->msgId); + } + else if(!msg->metaData->mAuthorId.isNull() && subscribed_groups.find(msg->metaData->mGroupId)!=subscribed_groups.end()) + { +#ifdef GXSUTIL_DEBUG + std::cerr << "TimeStamping message authors' key ID " << msg->metaData->mAuthorId << " in message " << msg->msgId << ", group ID " << msg->grpId<< std::endl; +#endif + used_gxs_ids.insert(msg->metaData->mAuthorId) ; + } - delete msg; - } - } + delete msg; + } + } - mDs->removeMsgs(msgsToDel); + mDs->removeMsgs(msgsToDel); - RsStackMutex stack(mIntegrityMutex); - mDone = true; + RsStackMutex stack(mIntegrityMutex); + mDone = true; - std::vector::iterator grpIt; - for(grpIt = grpsToDel.begin(); grpIt != grpsToDel.end(); ++grpIt) - { - mDeletedGrps.push_back(*grpIt); - } - mDeletedMsgs = msgsToDel; + std::vector::iterator grpIt; + for(grpIt = grpsToDel.begin(); grpIt != grpsToDel.end(); ++grpIt) + { + mDeletedGrps.push_back(*grpIt); + } + mDeletedMsgs = msgsToDel; +#ifdef GXSUTIL_DEBUG + std::cerr << "At end of pass, this is the list used GXS ids: " << std::endl; + std::cerr << " requesting them to GXS identity service to enforce loading." << std::endl; +#endif + + std::list connected_friends ; + rsPeers->getOnlineList(connected_friends) ; + + std::vector gxs_ids ; + + for(std::set::const_iterator it(used_gxs_ids.begin());it!=used_gxs_ids.end();++it) + { + gxs_ids.push_back(*it) ; +#ifdef GXSUTIL_DEBUG + std::cerr << " " << *it << std::endl; +#endif + } + int nb_requested_not_in_cache = 0; + +#ifdef GXSUTIL_DEBUG + std::cerr << " issuing random get on friends for non existing IDs" << std::endl; +#endif + + for(;nb_requested_not_in_cache0;) + { + uint32_t n = RSRandom::random_u32() % gxs_ids.size() ; +#ifdef GXSUTIL_DEBUG + std::cerr << " requesting ID " << gxs_ids[n] ; +#endif + + if(!mGixs->haveKey(gxs_ids[n])) // checks if we have it already in the cache (conservative way to ensure that we atually have it) + { + mGixs->requestKey(gxs_ids[n],connected_friends); + + ++nb_requested_not_in_cache ; +#ifdef GXSUTIL_DEBUG + std::cerr << " ... from cache/net" << std::endl; +#endif + } + else + std::cerr << " ... already in cache" << std::endl; + + gxs_ids[n] = gxs_ids[gxs_ids.size()-1] ; + gxs_ids.pop_back() ; + } +#ifdef GXSUTIL_DEBUG + std::cerr << " total actual cache requests: "<< nb_requested_not_in_cache << std::endl; +#endif + return true; }