diff --git a/libretroshare/src/gxs/rsgenexchange.cc b/libretroshare/src/gxs/rsgenexchange.cc index 7919b8223..7028d6295 100644 --- a/libretroshare/src/gxs/rsgenexchange.cc +++ b/libretroshare/src/gxs/rsgenexchange.cc @@ -2890,8 +2890,8 @@ void RsGenExchange::processRecvdMessages() bool accept_new_msg = msg->metaData != NULL && acceptNewMessage(msg->metaData,msg->msg.bin_len); - if(!accept_new_msg && mNetService != NULL) - mNetService->rejectMessage(msg->metaData->mMsgId) ; // This prevents reloading the message again at next sync. + if(!accept_new_msg) + messages_to_reject.push_back(msg->metaData->mMsgId); // This prevents reloading the message again at next sync. if(!accept_new_msg || gpsi.mFirstTryTS + VALIDATE_MAX_WAITING_TIME < now) { @@ -3037,234 +3037,6 @@ void RsGenExchange::processRecvdMessages() mNetService->rejectMessage(*it) ; } -#ifdef OLD_VERSION -void RsGenExchange::processRecvdMessages() -{ - std::list messages_to_reject ; - - { - RS_STACK_MUTEX(mGenMtx) ; - - time_t now = time(NULL); - -#ifdef GEN_EXCH_DEBUG - if(!mMsgPendingValidate.empty()) - std::cerr << "processing received messages" << std::endl; -#endif - NxsMsgPendingVect::iterator pend_it = mMsgPendingValidate.begin(); - - for(; pend_it != mMsgPendingValidate.end();) - { - GxsPendingItem& gpsi = *pend_it; - - if(gpsi.mFirstTryTS + VALIDATE_MAX_WAITING_TIME < now) - { - std::cerr << "Pending validation grp=" << gpsi.mId.first << ", msg=" << gpsi.mId.second << ", has exceeded validation time limit. The author's key can probably not be obtained. This is unexpected." << std::endl; - - delete gpsi.mItem; - pend_it = mMsgPendingValidate.erase(pend_it); - } - else - { -#ifdef GEN_EXCH_DEBUG - std::cerr << " movign to recvd." << std::endl; -#endif - mReceivedMsgs.push_back(gpsi.mItem); - ++pend_it; - } - } - - if(mReceivedMsgs.empty()) - return; - - std::vector::iterator vit = mReceivedMsgs.begin(); - GxsMsgReq msgIds; - RsNxsMsgDataTemporaryList msgs; - RsGxsGrpMetaTemporaryMap grpMetas; - - // coalesce group meta retrieval for performance - for(; vit != mReceivedMsgs.end(); ++vit) - { - RsNxsMsg* msg = *vit; - grpMetas.insert(std::make_pair(msg->grpId, (RsGxsGrpMetaData*)NULL)); - } - - mDataStore->retrieveGxsGrpMetaData(grpMetas); - -#ifdef GEN_EXCH_DEBUG - std::cerr << " updating received messages:" << std::endl; -#endif - for(vit = mReceivedMsgs.begin(); vit != mReceivedMsgs.end(); ++vit) - { - RsNxsMsg* msg = *vit; - - if(msg->metaData == NULL) - { - RsGxsMsgMetaData* meta = new RsGxsMsgMetaData(); - - if(msg->meta.bin_len != 0 && meta->deserialise(msg->meta.bin_data, &(msg->meta.bin_len))) - msg->metaData = meta; - else - delete meta; - } - - // (cyril) Normally we should discard posts that are older than the sync request. But that causes a problem because - // RsGxsNetService requests posts to sync by chunks of 20. So if the 20 are discarded, they will be re-synced next time, and the sync process - // will indefinitly loop on the same 20 posts. Since the posts are there already, keeping them is the least problematique way to fix this problem. - // - // uint32_t max_sync_age = ( mNetService != NULL)?( mNetService->getSyncAge(msg->metaData->mGroupId)):RS_GXS_DEFAULT_MSG_REQ_PERIOD; - // - // if(max_sync_age != 0 && msg->metaData->mPublishTs + max_sync_age < time(NULL)) - // { - // std::cerr << "(WW) not validating message " << msg->metaData->mMsgId << " in group " << msg->metaData->mGroupId << " because it is older than synchronisation limit. This message was probably sent by a friend node that does not accept sync limits already." << std::endl; - // ok = false ; - // } - -#ifdef GEN_EXCH_DEBUG - std::cerr << " deserialised info: grp id=" << meta->mGroupId << ", msg id=" << meta->mMsgId ; -#endif - uint8_t validateReturn = VALIDATE_FAIL; - - bool accept_new_msg = acceptNewMessage(msg->metaData,msg->msg.bin_len); - - if(ok && !accept_new_msg && mNetService != NULL) - mNetService->rejectMessage(msg->metaData->mMsgId) ; // This prevents reloading the message again at next sync. - - if(ok && accept_new_msg) - { - std::map::iterator mit = grpMetas.find(msg->grpId); - -#ifdef GEN_EXCH_DEBUG - std::cerr << " msg info : grp id=" << msg->grpId << ", msg id=" << msg->msgId << std::endl; -#endif - RsGxsGrpMetaData* grpMeta = NULL ; - - // validate msg - if(mit != grpMetas.end()) - { - grpMeta = mit->second; - GxsSecurity::createPublicKeysFromPrivateKeys(grpMeta->keys); // make sure we have the public keys that correspond to the private ones, as it happens. Most of the time this call does nothing. - - validateReturn = validateMsg(msg, grpMeta->mGroupFlags, grpMeta->mSignFlags, grpMeta->keys); - -#ifdef GEN_EXCH_DEBUG - std::cerr << " grpMeta.mSignFlags: " << std::hex << grpMeta->mSignFlags << std::dec << std::endl; - std::cerr << " grpMeta.mAuthFlags: " << std::hex << grpMeta->mAuthenFlags << std::dec << std::endl; - std::cerr << " message validation result: " << (int)validateReturn << std::endl; -#endif - } - - if(validateReturn == VALIDATE_SUCCESS) - { - meta->mMsgStatus = GXS_SERV::GXS_MSG_STATUS_UNPROCESSED | GXS_SERV::GXS_MSG_STATUS_GUI_NEW | GXS_SERV::GXS_MSG_STATUS_GUI_UNREAD; - msgs.push_back(msg); - - std::vector &msgv = msgIds[msg->grpId]; - if (std::find(msgv.begin(), msgv.end(), msg->msgId) == msgv.end()) - { - msgv.push_back(msg->msgId); - } - - NxsMsgPendingVect::iterator validated_entry = std::find(mMsgPendingValidate.begin(), mMsgPendingValidate.end(), - getMsgIdPair(*msg)); - - if(validated_entry != mMsgPendingValidate.end()) mMsgPendingValidate.erase(validated_entry); - - computeHash(msg->msg, meta->mHash); - meta->recvTS = time(NULL); -#ifdef GEN_EXCH_DEBUG - std::cerr << " new status flags: " << meta->mMsgStatus << std::endl; - std::cerr << " computed hash: " << meta->mHash << std::endl; - std::cerr << "Message received. Identity=" << msg->metaData->mAuthorId << ", from peer " << msg->PeerId() << std::endl; -#endif - - if(!msg->metaData->mAuthorId.isNull()) - mRoutingClues[msg->metaData->mAuthorId].insert(msg->PeerId()) ; - } - - if(validateReturn == VALIDATE_FAIL) - { - // In this case, we notify the network exchange service not to DL the message again, at least not yet. - -#ifdef GEN_EXCH_DEBUG - std::cerr << "Notifying the network service to not download this message again." << std::endl; -#endif - messages_to_reject.push_back(msg->msgId) ; - } - } - else - { -#ifdef GEN_EXCH_DEBUG - std::cerr << " deserialisation failed!" <grpId: " << msg->grpId << ", msgId: " << msg->msgId << std::endl; -#endif - - NxsMsgPendingVect::iterator failed_entry = std::find(mMsgPendingValidate.begin(), mMsgPendingValidate.end(), - getMsgIdPair(*msg)); - - if(failed_entry != mMsgPendingValidate.end()) mMsgPendingValidate.erase(failed_entry); - delete msg; - - - } - else if(validateReturn == VALIDATE_FAIL_TRY_LATER) - { - -#ifdef GEN_EXCH_DEBUG - std::cerr << "failed to validate msg, trying again: " - << "msg->grpId: " << msg->grpId << ", msgId: " << msg->msgId << std::endl; -#endif - - RsGxsGrpMsgIdPair id; - id.first = msg->grpId; - id.second = msg->msgId; - - // first check you haven't made too many attempts - - NxsMsgPendingVect::iterator vit = std::find(mMsgPendingValidate.begin(), mMsgPendingValidate.end(), id); - - if(vit == mMsgPendingValidate.end()) - mMsgPendingValidate.push_back(GxsPendingItem(msg, id,time(NULL))); - } - } - - if(!msgIds.empty()) - { -#ifdef GEN_EXCH_DEBUG - std::cerr << " removing existing and old messages from incoming list." << std::endl; -#endif - removeDeleteExistingMessages(msgs, msgIds); - -#ifdef GEN_EXCH_DEBUG - std::cerr << " storing remaining messages" << std::endl; -#endif - mDataStore->storeMessage(msgs); - - RsGxsMsgChange* c = new RsGxsMsgChange(RsGxsNotify::TYPE_RECEIVE, false); - c->msgChangeMap = msgIds; - mNotifications.push_back(c); - } - - mReceivedMsgs.clear(); - } - - // Done off-mutex to avoid cross deadlocks in the netservice that might call the RsGenExchange as an observer.. - - if(mNetService != NULL) - for(std::list::const_iterator it(messages_to_reject.begin());it!=messages_to_reject.end();++it) - mNetService->rejectMessage(*it) ; -} -#endif - bool RsGenExchange::acceptNewGroup(const RsGxsGrpMetaData* /*grpMeta*/ ) { return true; } bool RsGenExchange::acceptNewMessage(const RsGxsMsgMetaData* /*grpMeta*/,uint32_t /*size*/ ) { return true; } @@ -3397,148 +3169,6 @@ void RsGenExchange::processRecvdGroups() } } -#ifdef OLD_VERSION -void RsGenExchange::processRecvdGroups() -{ - RS_STACK_MUTEX(mGenMtx) ; - - if(mReceivedGrps.empty()) - return; - -#ifdef GEN_EXCH_DEBUG - std::cerr << "RsGenExchange::Processing received groups" << std::endl; -#endif - NxsGrpPendValidVect::iterator vit = mReceivedGrps.begin(); - std::vector existingGrpIds; - std::list grpIds; - - RsNxsGrpDataTemporaryList grps; - - mDataStore->retrieveGroupIds(existingGrpIds); - - while( vit != mReceivedGrps.end()) - { - GxsPendingItem& gpsi = *vit; - RsNxsGrp* grp = gpsi.mItem; - - if(grp->metaData == NULL) - { - RsGxsGrpMetaData* meta = new RsGxsGrpMetaData(); - - if(grp->meta.bin_len != 0 && meta->deserialise(grp->meta.bin_data, grp->meta.bin_len)) - grp->metaData = meta ; - else - delete meta ; - } - - bool erase = true; - - if(grp->metaData != NULL && acceptNewGroup(grp->metaData)) - { -#ifdef GEN_EXCH_DEBUG - std::cerr << " processing validation for group " << meta->mGroupId << ", original attempt time: " << time(NULL) - gpsi.mFirstTryTS << " seconds ago" << std::endl; -#endif - uint8_t ret = validateGrp(grp); - - if(ret == VALIDATE_SUCCESS) - { - grp->metaData->mGroupStatus = GXS_SERV::GXS_GRP_STATUS_UNPROCESSED | GXS_SERV::GXS_GRP_STATUS_UNREAD; - - computeHash(grp->grp, grp->metaData->mHash); - - // group has been validated. Let's notify the global router for the clue - - if(!meta->mAuthorId.isNull()) - { -#ifdef GEN_EXCH_DEBUG - std::cerr << "Group routage info: Identity=" << meta->mAuthorId << " from " << grp->PeerId() << std::endl; -#endif - - mRoutingClues[meta->mAuthorId].insert(grp->PeerId()) ; - } - - // This has been moved here (as opposed to inside part for new groups below) because it is used to update the server TS when updates - // of grp metadata arrive. - - meta->mRecvTS = time(NULL); - - // now check if group already existss - if(std::find(existingGrpIds.begin(), existingGrpIds.end(), grp->grpId) == existingGrpIds.end()) - { - //if(meta->mCircleType == GXS_CIRCLE_TYPE_YOUREYESONLY) - meta->mOriginator = grp->PeerId(); - - meta->mSubscribeFlags = GXS_SERV::GROUP_SUBSCRIBE_NOT_SUBSCRIBED; - - grps.push_back(grp); - grpIds.push_back(grp->grpId); - } - else - { - GroupUpdate update; - update.newGrp = grp; - mGroupUpdates.push_back(update); - } - erase = true; - } - else if(ret == VALIDATE_FAIL) - { -#ifdef GEN_EXCH_DEBUG - std::cerr << " failed to validate incoming meta, grpId: " << grp->grpId << ": wrong signature" << std::endl; -#endif - delete grp; - erase = true; - } - else if(ret == VALIDATE_FAIL_TRY_LATER) - { - -#ifdef GEN_EXCH_DEBUG - std::cerr << " failed to validate incoming grp, trying again. grpId: " << grp->grpId << std::endl; -#endif - - if(gpsi.mFirstTryTS + VALIDATE_MAX_WAITING_TIME < time(NULL)) - { -#ifdef GEN_EXCH_DEBUG - std::cerr << " validation time got group " << grp->grpId << " exceeded maximum. Will delete group " << std::endl; -#endif - delete grp; - erase = true; - } - else - erase = false; - } - } - else - { - if(!deserialOk) - std::cerr << "(EE) deserialise error in group meta data" << std::endl; - - delete grp; - delete meta; - erase = true; - } - - if(erase) - vit = mReceivedGrps.erase(vit); - else - ++vit; - } - - if(!grpIds.empty()) - { - RsGxsGroupChange* c = new RsGxsGroupChange(RsGxsNotify::TYPE_RECEIVE, false); - c->mGrpIdList = grpIds; - mNotifications.push_back(c); - mDataStore->storeGroup(grps); -#ifdef GEN_EXCH_DEBUG - std::cerr << " adding the following grp ids to notification: " << std::endl; - for(std::list::const_iterator it(grpIds.begin());it!=grpIds.end();++it) - std::cerr << " " << *it << std::endl; -#endif - } -} -#endif - void RsGenExchange::performUpdateValidation() { RS_STACK_MUTEX(mGenMtx) ;