From 041f989f1cfcf5fce2d54ed4095008ba3738f673 Mon Sep 17 00:00:00 2001 From: csoler Date: Tue, 25 Jul 2017 21:21:24 +0200 Subject: [PATCH] rewrote processRecvdMessages() and processRecvdGroups() in RsGenExchange. Removed mReceivedGrps and mReceivedMsgs, cleaned-up terrible branching and memory management --- libretroshare/src/gxs/rsgenexchange.cc | 425 +++++++++++++++--- libretroshare/src/gxs/rsgenexchange.h | 11 +- .../serialiser/rsturtleitem_test.cc | 29 +- 3 files changed, 388 insertions(+), 77 deletions(-) diff --git a/libretroshare/src/gxs/rsgenexchange.cc b/libretroshare/src/gxs/rsgenexchange.cc index 5be6fd3f1..7919b8223 100644 --- a/libretroshare/src/gxs/rsgenexchange.cc +++ b/libretroshare/src/gxs/rsgenexchange.cc @@ -1339,11 +1339,16 @@ bool RsGenExchange::deserializeGroupData(unsigned char *data, uint32_t size, return false; } - mReceivedGrps.push_back( - GxsPendingItem( - nxs_grp, nxs_grp->grpId,time(NULL)) ); + if(mGrpPendingValidate.find(nxs_grp->grpId) != mGrpPendingValidate.end()) + { + std::cerr << "(WW) Group " << nxs_grp->grpId << " is already pending validation. Not adding again." << std::endl; + return true; + } - if(gId) *gId = nxs_grp->grpId; + if(gId) + *gId = nxs_grp->grpId; + + mGrpPendingValidate.insert(std::make_pair(nxs_grp->grpId, GxsPendingItem(nxs_grp, nxs_grp->grpId,time(NULL)))); return true; } @@ -1576,20 +1581,18 @@ void RsGenExchange::notifyNewGroups(std::vector &groups) for(; vit != groups.end(); ++vit) { RsNxsGrp* grp = *vit; - NxsGrpPendValidVect::iterator received = std::find(mReceivedGrps.begin(), - mReceivedGrps.end(), grp->grpId); + NxsGrpPendValidVect::iterator received = mGrpPendingValidate.find(grp->grpId); // drop group if you already have them // TODO: move this to nxs layer to save bandwidth - if(received == mReceivedGrps.end()) + if(received == mGrpPendingValidate.end()) { #ifdef GEN_EXCH_DEBUG std::cerr << "RsGenExchange::notifyNewGroups() Received GrpId: " << grp->grpId; std::cerr << std::endl; #endif - GxsPendingItem gpsi(grp, grp->grpId,time(NULL)); - mReceivedGrps.push_back(gpsi); + mGrpPendingValidate.insert(std::make_pair(grp->grpId, GxsPendingItem(grp, grp->grpId,time(NULL)))); } else { @@ -1604,37 +1607,36 @@ void RsGenExchange::notifyNewMessages(std::vector& messages) { RS_STACK_MUTEX(mGenMtx) ; - std::vector::iterator vit = messages.begin(); + // store these for tick() to pick them up - // store these for tick() to pick them up - for(; vit != messages.end(); ++vit) - { - RsNxsMsg* msg = *vit; - - NxsMsgPendingVect::iterator it = - std::find(mMsgPendingValidate.begin(), mMsgPendingValidate.end(), getMsgIdPair(*msg)); - - // if we have msg already just delete it - if(it == mMsgPendingValidate.end()) + for(uint32_t i=0;igrpId; - std::cerr << " MsgId: " << msg->msgId; - std::cerr << std::endl; -#endif + RsNxsMsg* msg = messages[i]; + NxsMsgPendingVect::iterator it = mMsgPendingValidate.find(msg->msgId) ; - mReceivedMsgs.push_back(msg); - } - else - { + // if we have msg already just delete it + if(it == mMsgPendingValidate.end()) + { #ifdef GEN_EXCH_DEBUG - std::cerr << " message is already in pending validation list. dropping." << std::endl; + std::cerr << "RsGenExchange::notifyNewMessages() Received Msg: "; + std::cerr << " GrpId: " << msg->grpId; + std::cerr << " MsgId: " << msg->msgId; + std::cerr << std::endl; #endif - delete msg; - } - } + RsGxsGrpMsgIdPair id; + id.first = msg->grpId; + id.second = msg->msgId; + mMsgPendingValidate.insert(std::make_pair(msg->msgId,GxsPendingItem(msg, id,time(NULL)))); + } + else + { +#ifdef GEN_EXCH_DEBUG + std::cerr << " message is already in pending validation list. dropping." << std::endl; +#endif + delete msg; + } + } } void RsGenExchange::notifyReceivePublishKey(const RsGxsGroupId &grpId) @@ -2854,6 +2856,188 @@ void RsGenExchange::computeHash(const RsTlvBinaryData& data, RsFileHash& hash) pHash.Complete(hash); } +void RsGenExchange::processRecvdMessages() +{ + std::list messages_to_reject ; + + { + RS_STACK_MUTEX(mGenMtx) ; + + time_t now = time(NULL); + +#ifdef GEN_EXCH_DEBUG + if(!mMsgPendingValidate.empty()) + std::cerr << "processing received messages" << std::endl; +#endif + // 1 - First, make sure items metadata is deserialised, clean old failed items, and collect the groups Ids we have to check + + RsGxsGrpMetaTemporaryMap grpMetas; + + for(NxsMsgPendingVect::iterator pend_it = mMsgPendingValidate.begin();pend_it != mMsgPendingValidate.end();) + { + GxsPendingItem& gpsi = pend_it->second; + RsNxsMsg *msg = gpsi.mItem ; + + if(msg->metaData == NULL) + { + RsGxsMsgMetaData* meta = new RsGxsMsgMetaData(); + + if(msg->meta.bin_len != 0 && meta->deserialise(msg->meta.bin_data, &(msg->meta.bin_len))) + msg->metaData = meta; + else + delete meta; + } + + bool accept_new_msg = msg->metaData != NULL && acceptNewMessage(msg->metaData,msg->msg.bin_len); + + if(!accept_new_msg && mNetService != NULL) + mNetService->rejectMessage(msg->metaData->mMsgId) ; // This prevents reloading the message again at next sync. + + if(!accept_new_msg || gpsi.mFirstTryTS + VALIDATE_MAX_WAITING_TIME < now) + { + std::cerr << "Pending validation grp=" << gpsi.mId.first << ", msg=" << gpsi.mId.second << ", has exceeded validation time limit. The author's key can probably not be obtained. This is unexpected." << std::endl; + + delete gpsi.mItem; + pend_it = mMsgPendingValidate.erase(pend_it); + } + else + { + grpMetas.insert(std::make_pair(pend_it->second.mItem->grpId, (RsGxsGrpMetaData*)NULL)); + ++pend_it; + } + } + + // 2 - Retrieve the metadata for the associated groups. + + mDataStore->retrieveGxsGrpMetaData(grpMetas); + + GxsMsgReq msgIds; + RsNxsMsgDataTemporaryList msgs_to_store; + +#ifdef GEN_EXCH_DEBUG + std::cerr << " updating received messages:" << std::endl; +#endif + + // 3 - Validate each message + + for(NxsMsgPendingVect::iterator pend_it = mMsgPendingValidate.begin();pend_it != mMsgPendingValidate.end();) + { + RsNxsMsg* msg = pend_it->second.mItem; + + // (cyril) Normally we should discard posts that are older than the sync request. But that causes a problem because + // RsGxsNetService requests posts to sync by chunks of 20. So if the 20 are discarded, they will be re-synced next time, and the sync process + // will indefinitly loop on the same 20 posts. Since the posts are there already, keeping them is the least problematique way to fix this problem. + // + // uint32_t max_sync_age = ( mNetService != NULL)?( mNetService->getSyncAge(msg->metaData->mGroupId)):RS_GXS_DEFAULT_MSG_REQ_PERIOD; + // + // if(max_sync_age != 0 && msg->metaData->mPublishTs + max_sync_age < time(NULL)) + // { + // std::cerr << "(WW) not validating message " << msg->metaData->mMsgId << " in group " << msg->metaData->mGroupId << " because it is older than synchronisation limit. This message was probably sent by a friend node that does not accept sync limits already." << std::endl; + // ok = false ; + // } + +#ifdef GEN_EXCH_DEBUG + std::cerr << " deserialised info: grp id=" << meta->mGroupId << ", msg id=" << meta->mMsgId ; +#endif + std::map::iterator mit = grpMetas.find(msg->grpId); + +#ifdef GEN_EXCH_DEBUG + std::cerr << " msg info : grp id=" << msg->grpId << ", msg id=" << msg->msgId << std::endl; +#endif + // validate msg + + if(mit == grpMetas.end()) + { + std::cerr << "RsGenExchange::processRecvdMessages(): impossible situation: grp meta " << msg->grpId << " not available." << std::endl; + ++pend_it ; + continue ; + } + + RsGxsGrpMetaData *grpMeta = mit->second; + + GxsSecurity::createPublicKeysFromPrivateKeys(grpMeta->keys); // make sure we have the public keys that correspond to the private ones, as it happens. Most of the time this call does nothing. + + int validateReturn = validateMsg(msg, grpMeta->mGroupFlags, grpMeta->mSignFlags, grpMeta->keys); + +#ifdef GEN_EXCH_DEBUG + std::cerr << " grpMeta.mSignFlags: " << std::hex << grpMeta->mSignFlags << std::dec << std::endl; + std::cerr << " grpMeta.mAuthFlags: " << std::hex << grpMeta->mAuthenFlags << std::dec << std::endl; + std::cerr << " message validation result: " << (int)validateReturn << std::endl; +#endif + + if(validateReturn == VALIDATE_SUCCESS) + { + msg->metaData->mMsgStatus = GXS_SERV::GXS_MSG_STATUS_UNPROCESSED | GXS_SERV::GXS_MSG_STATUS_GUI_NEW | GXS_SERV::GXS_MSG_STATUS_GUI_UNREAD; + msgs_to_store.push_back(msg); + + std::vector &msgv = msgIds[msg->grpId]; + + if (std::find(msgv.begin(), msgv.end(), msg->msgId) == msgv.end()) + msgv.push_back(msg->msgId); + + computeHash(msg->msg, msg->metaData->mHash); + msg->metaData->recvTS = time(NULL); + +#ifdef GEN_EXCH_DEBUG + std::cerr << " new status flags: " << meta->mMsgStatus << std::endl; + std::cerr << " computed hash: " << meta->mHash << std::endl; + std::cerr << "Message received. Identity=" << msg->metaData->mAuthorId << ", from peer " << msg->PeerId() << std::endl; +#endif + + if(!msg->metaData->mAuthorId.isNull()) + mRoutingClues[msg->metaData->mAuthorId].insert(msg->PeerId()) ; + } + else if(validateReturn == VALIDATE_FAIL) + { + // In this case, we notify the network exchange service not to DL the message again, at least not yet. + +#ifdef GEN_EXCH_DEBUG + std::cerr << "Validation failed for message id " + << "msg->grpId: " << msg->grpId << ", msgId: " << msg->msgId << std::endl; +#endif + messages_to_reject.push_back(msg->msgId) ; + delete msg ; + } + else if(validateReturn == VALIDATE_FAIL_TRY_LATER) + { + ++pend_it ; + continue; + } + + // Remove the entry from mMsgPendingValidate, but do not delete msg since it's either pushed into msg_to_store or deleted in the FAIL case! + + NxsMsgPendingVect::iterator tmp = pend_it ; + ++tmp ; + mMsgPendingValidate.erase(pend_it) ; + pend_it = tmp ; + } + + if(!msgIds.empty()) + { +#ifdef GEN_EXCH_DEBUG + std::cerr << " removing existing and old messages from incoming list." << std::endl; +#endif + removeDeleteExistingMessages(msgs_to_store, msgIds); + +#ifdef GEN_EXCH_DEBUG + std::cerr << " storing remaining messages" << std::endl; +#endif + mDataStore->storeMessage(msgs_to_store); + + RsGxsMsgChange* c = new RsGxsMsgChange(RsGxsNotify::TYPE_RECEIVE, false); + c->msgChangeMap = msgIds; + mNotifications.push_back(c); + } + } + + // Done off-mutex to avoid cross deadlocks in the netservice that might call the RsGenExchange as an observer.. + + if(mNetService != NULL) + for(std::list::const_iterator it(messages_to_reject.begin());it!=messages_to_reject.end();++it) + mNetService->rejectMessage(*it) ; +} + +#ifdef OLD_VERSION void RsGenExchange::processRecvdMessages() { std::list messages_to_reject ; @@ -2913,14 +3097,16 @@ void RsGenExchange::processRecvdMessages() for(vit = mReceivedMsgs.begin(); vit != mReceivedMsgs.end(); ++vit) { RsNxsMsg* msg = *vit; - RsGxsMsgMetaData* meta = new RsGxsMsgMetaData(); - bool ok = false; + if(msg->metaData == NULL) + { + RsGxsMsgMetaData* meta = new RsGxsMsgMetaData(); - if(msg->meta.bin_len != 0) - ok = meta->deserialise(msg->meta.bin_data, &(msg->meta.bin_len)); - - msg->metaData = meta; + if(msg->meta.bin_len != 0 && meta->deserialise(msg->meta.bin_data, &(msg->meta.bin_len))) + msg->metaData = meta; + else + delete meta; + } // (cyril) Normally we should discard posts that are older than the sync request. But that causes a problem because // RsGxsNetService requests posts to sync by chunks of 20. So if the 20 are discarded, they will be re-synced next time, and the sync process @@ -2938,10 +3124,11 @@ void RsGenExchange::processRecvdMessages() std::cerr << " deserialised info: grp id=" << meta->mGroupId << ", msg id=" << meta->mMsgId ; #endif uint8_t validateReturn = VALIDATE_FAIL; - bool accept_new_msg = acceptNewMessage(meta,msg->msg.bin_len); - if(!accept_new_msg && mNetService != NULL) - mNetService->rejectMessage(meta->mMsgId) ; // This prevents reloading the message again at next sync. + bool accept_new_msg = acceptNewMessage(msg->metaData,msg->msg.bin_len); + + if(ok && !accept_new_msg && mNetService != NULL) + mNetService->rejectMessage(msg->metaData->mMsgId) ; // This prevents reloading the message again at next sync. if(ok && accept_new_msg) { @@ -3076,10 +3263,141 @@ void RsGenExchange::processRecvdMessages() for(std::list::const_iterator it(messages_to_reject.begin());it!=messages_to_reject.end();++it) mNetService->rejectMessage(*it) ; } +#endif bool RsGenExchange::acceptNewGroup(const RsGxsGrpMetaData* /*grpMeta*/ ) { return true; } bool RsGenExchange::acceptNewMessage(const RsGxsMsgMetaData* /*grpMeta*/,uint32_t /*size*/ ) { return true; } +void RsGenExchange::processRecvdGroups() +{ + RS_STACK_MUTEX(mGenMtx) ; + + if(mGrpPendingValidate.empty()) + return; + +#ifdef GEN_EXCH_DEBUG + std::cerr << "RsGenExchange::Processing received groups" << std::endl; +#endif + std::list grpIds; + RsNxsGrpDataTemporaryList grps_to_store; + + // 1 - retrieve the existing groups so as to check what's not new + std::vector existingGrpIds; + mDataStore->retrieveGroupIds(existingGrpIds); + + // 2 - go through each and every new group data and validate the signatures. + + for(NxsGrpPendValidVect::iterator vit = mGrpPendingValidate.begin(); vit != mGrpPendingValidate.end();) + { + GxsPendingItem& gpsi = vit->second; + RsNxsGrp* grp = gpsi.mItem; + +#ifdef GEN_EXCH_DEBUG + std::cerr << " processing validation for group " << meta->mGroupId << ", original attempt time: " << time(NULL) - gpsi.mFirstTryTS << " seconds ago" << std::endl; +#endif + if(grp->metaData == NULL) + { + RsGxsGrpMetaData* meta = new RsGxsGrpMetaData(); + + if(grp->meta.bin_len != 0 && meta->deserialise(grp->meta.bin_data, grp->meta.bin_len)) + grp->metaData = meta ; + else + delete meta ; + } + + // early deletion of group from the pending list if it's malformed, not accepted, or has been tried unsuccessfully for too long + + if(grp->metaData == NULL || !acceptNewGroup(grp->metaData) || gpsi.mFirstTryTS + VALIDATE_MAX_WAITING_TIME < time(NULL)) + { + NxsGrpPendValidVect::iterator tmp(vit) ; + ++tmp ; + delete grp ; + mGrpPendingValidate.erase(vit) ; + vit = tmp ; + continue; + } + + // group signature validation + + uint8_t ret = validateGrp(grp); + + if(ret == VALIDATE_SUCCESS) + { + grp->metaData->mGroupStatus = GXS_SERV::GXS_GRP_STATUS_UNPROCESSED | GXS_SERV::GXS_GRP_STATUS_UNREAD; + + computeHash(grp->grp, grp->metaData->mHash); + + // group has been validated. Let's notify the global router for the clue + + if(!grp->metaData->mAuthorId.isNull()) + { +#ifdef GEN_EXCH_DEBUG + std::cerr << "Group routage info: Identity=" << meta->mAuthorId << " from " << grp->PeerId() << std::endl; +#endif + mRoutingClues[grp->metaData->mAuthorId].insert(grp->PeerId()) ; + } + + // This has been moved here (as opposed to inside part for new groups below) because it is used to update the server TS when updates + // of grp metadata arrive. + + grp->metaData->mRecvTS = time(NULL); + + // now check if group already exists + + if(std::find(existingGrpIds.begin(), existingGrpIds.end(), grp->grpId) == existingGrpIds.end()) + { + grp->metaData->mOriginator = grp->PeerId(); + grp->metaData->mSubscribeFlags = GXS_SERV::GROUP_SUBSCRIBE_NOT_SUBSCRIBED; + + grps_to_store.push_back(grp); + grpIds.push_back(grp->grpId); + } + else + { + GroupUpdate update; + update.newGrp = grp; + mGroupUpdates.push_back(update); + } + } + else if(ret == VALIDATE_FAIL) + { +#ifdef GEN_EXCH_DEBUG + std::cerr << " failed to validate incoming meta, grpId: " << grp->grpId << ": wrong signature" << std::endl; +#endif + delete grp; + } + else if(ret == VALIDATE_FAIL_TRY_LATER) + { +#ifdef GEN_EXCH_DEBUG + std::cerr << " failed to validate incoming grp, trying again later. grpId: " << grp->grpId << std::endl; +#endif + ++vit ; + continue; + } + + // Erase entry from the list + + NxsGrpPendValidVect::iterator tmp(vit) ; + ++tmp ; + mGrpPendingValidate.erase(vit) ; + vit = tmp ; + } + + if(!grpIds.empty()) + { + RsGxsGroupChange* c = new RsGxsGroupChange(RsGxsNotify::TYPE_RECEIVE, false); + c->mGrpIdList = grpIds; + mNotifications.push_back(c); + mDataStore->storeGroup(grps_to_store); +#ifdef GEN_EXCH_DEBUG + std::cerr << " adding the following grp ids to notification: " << std::endl; + for(std::list::const_iterator it(grpIds.begin());it!=grpIds.end();++it) + std::cerr << " " << *it << std::endl; +#endif + } +} + +#ifdef OLD_VERSION void RsGenExchange::processRecvdGroups() { RS_STACK_MUTEX(mGenMtx) ; @@ -3102,27 +3420,31 @@ void RsGenExchange::processRecvdGroups() { GxsPendingItem& gpsi = *vit; RsNxsGrp* grp = gpsi.mItem; - RsGxsGrpMetaData* meta = new RsGxsGrpMetaData(); - bool deserialOk = false; - if(grp->meta.bin_len != 0) - deserialOk = meta->deserialise(grp->meta.bin_data, grp->meta.bin_len); + if(grp->metaData == NULL) + { + RsGxsGrpMetaData* meta = new RsGxsGrpMetaData(); + + if(grp->meta.bin_len != 0 && meta->deserialise(grp->meta.bin_data, grp->meta.bin_len)) + grp->metaData = meta ; + else + delete meta ; + } bool erase = true; - if(deserialOk && acceptNewGroup(meta)) + if(grp->metaData != NULL && acceptNewGroup(grp->metaData)) { #ifdef GEN_EXCH_DEBUG std::cerr << " processing validation for group " << meta->mGroupId << ", original attempt time: " << time(NULL) - gpsi.mFirstTryTS << " seconds ago" << std::endl; #endif - grp->metaData = meta; uint8_t ret = validateGrp(grp); if(ret == VALIDATE_SUCCESS) { - meta->mGroupStatus = GXS_SERV::GXS_GRP_STATUS_UNPROCESSED | GXS_SERV::GXS_GRP_STATUS_UNREAD; + grp->metaData->mGroupStatus = GXS_SERV::GXS_GRP_STATUS_UNPROCESSED | GXS_SERV::GXS_GRP_STATUS_UNREAD; - computeHash(grp->grp, meta->mHash); + computeHash(grp->grp, grp->metaData->mHash); // group has been validated. Let's notify the global router for the clue @@ -3215,6 +3537,7 @@ void RsGenExchange::processRecvdGroups() #endif } } +#endif void RsGenExchange::performUpdateValidation() { diff --git a/libretroshare/src/gxs/rsgenexchange.h b/libretroshare/src/gxs/rsgenexchange.h index 78151cdae..febfc5ff7 100644 --- a/libretroshare/src/gxs/rsgenexchange.h +++ b/libretroshare/src/gxs/rsgenexchange.h @@ -863,8 +863,8 @@ private: std::vector mReceivedMsgs; - typedef std::vector > NxsGrpPendValidVect; - NxsGrpPendValidVect mReceivedGrps; + typedef std::map > NxsGrpPendValidVect; + NxsGrpPendValidVect mGrpPendingValidate; std::vector mGrpsToPublish; typedef std::vector NxsGrpSignPendVect; @@ -885,11 +885,10 @@ private: /// authentication policy uint32_t mAuthenPolicy; - std::map > - mMsgPendingSign; + std::map > mMsgPendingSign; - std::vector > mMsgPendingValidate; - typedef std::vector > NxsMsgPendingVect; + typedef std::map > NxsMsgPendingVect; + NxsMsgPendingVect mMsgPendingValidate; bool mCleaning; time_t mLastClean; diff --git a/tests/unittests/libretroshare/serialiser/rsturtleitem_test.cc b/tests/unittests/libretroshare/serialiser/rsturtleitem_test.cc index 3590d519a..265d50e9c 100644 --- a/tests/unittests/libretroshare/serialiser/rsturtleitem_test.cc +++ b/tests/unittests/libretroshare/serialiser/rsturtleitem_test.cc @@ -42,8 +42,6 @@ RsSerialType* init_item(CompressedChunkMap& map) map._map.clear() ; for(uint32_t i=0;i<15;++i) map._map.push_back(rand()) ; - - return new RsTurtleSerialiser(); } bool operator==(const CompressedChunkMap& m1,const CompressedChunkMap& m2) { @@ -62,11 +60,10 @@ bool operator==(const RsTurtleFileMapRequestItem& it1,const RsTurtleFileMapReque return true ; } -RsSerialType* init_item(RsTurtleFileMapRequestItem& item) +void init_item(RsTurtleFileMapRequestItem& item) { item.direction = 1 ; item.tunnel_id = 0x4ff823e2 ; - return new RsTurtleSerialiser(); } bool operator==(const RsTurtleFileMapItem& it1,const RsTurtleFileMapItem& it2) { @@ -76,14 +73,13 @@ bool operator==(const RsTurtleFileMapItem& it1,const RsTurtleFileMapItem& it2) return true ; } -RsSerialType* init_item(RsTurtleFileMapItem& item) +void init_item(RsTurtleFileMapItem& item) { item.direction = 1 ; item.tunnel_id = 0xf48fe232 ; init_item(item.compressed_map) ; - return new RsTurtleSerialiser(); } -RsSerialType* init_item(RsTurtleFileDataItem& item) +void init_item(RsTurtleFileDataItem& item) { static const uint32_t S = 3456 ; item.tunnel_id = 0x33eef982 ; @@ -92,7 +88,6 @@ RsSerialType* init_item(RsTurtleFileDataItem& item) item.chunk_data = new unsigned char[S] ; for(uint32_t i=0;i