diff --git a/libretroshare/src/gxs/rsgenexchange.cc b/libretroshare/src/gxs/rsgenexchange.cc index 2f0a6e195..0ee1fb0e8 100644 --- a/libretroshare/src/gxs/rsgenexchange.cc +++ b/libretroshare/src/gxs/rsgenexchange.cc @@ -62,8 +62,8 @@ static const uint32_t INDEX_AUTHEN_ADMIN = 0x00000040; // admin key //#define GEN_EXCH_DEBUG 1 -#define MSG_CLEANUP_PERIOD 60*5 // 5 minutes -#define INTEGRITY_CHECK_PERIOD 60*30 // 30 minutes +#define MSG_CLEANUP_PERIOD 60*5 // 5 minutes +#define INTEGRITY_CHECK_PERIOD 60*30 // 30 minutes RsGenExchange::RsGenExchange(RsGeneralDataService *gds, RsNetworkExchangeService *ns, RsSerialType *serviceSerialiser, uint16_t servType, RsGixs* gixs, @@ -1536,6 +1536,9 @@ void RsGenExchange::notifyNewMessages(std::vector& messages) } else { +#ifdef GEN_EXCH_DEBUG + std::cerr << " message is already in pending validation list. dropping." << std::endl; +#endif delete msg; } } @@ -2656,6 +2659,16 @@ void RsGenExchange::processRecvdMessages() if(grpMeta->mSignFlags & GXS_SERV::FLAG_AUTHOR_AUTHENTICATION_TRACK_MESSAGES) mTrackingClues.push_back(std::make_pair(msg->msgId,msg->PeerId())) ; } + + if(validateReturn == VALIDATE_FAIL) + { + // In this case, we notify the network exchange service not to DL the message again, at least not yet. + +#ifdef GEN_EXCH_DEBUG + std::cerr << "Notifying the network service to not download this message again." << std::endl; +#endif + mNetService->rejectMessage(msg->msgId) ; + } } else { diff --git a/libretroshare/src/gxs/rsgxsnetservice.cc b/libretroshare/src/gxs/rsgxsnetservice.cc index 598ea7bbc..38f5110c3 100644 --- a/libretroshare/src/gxs/rsgxsnetservice.cc +++ b/libretroshare/src/gxs/rsgxsnetservice.cc @@ -228,18 +228,33 @@ #define MAX_REQLIST_SIZE 20 // No more than 20 items per msg request list => creates smaller transactions that are less likely to be cancelled. #define TRANSAC_TIMEOUT 2000 // In seconds. Has been increased to avoid epidemic transaction cancelling due to overloaded outqueues. #define SECURITY_DELAY_TO_FORCE_CLIENT_REUPDATE 3600 // force re-update if there happens to be a large delay between our server side TS and the client side TS of friends +#define REJECTED_MESSAGE_RETRY_DELAY 24*3600 // re-try rejected messages every 24hrs. Most of the time this is because the peer's reputation has changed. // Debug system to allow to print only for some IDs (group, Peer, etc) #if defined(NXS_NET_DEBUG_0) || defined(NXS_NET_DEBUG_1) || defined(NXS_NET_DEBUG_2) || defined(NXS_NET_DEBUG_3) || defined(NXS_NET_DEBUG_4) || defined(NXS_NET_DEBUG_5) -static const RsPeerId peer_to_print ; -static const RsGxsGroupId group_id_to_print ; //= RsGxsGroupId(std::string("78a7480e7af4ae12303ec7fef2736745" )) ; // use this to allow to this group id only, or "" for all IDs -static const uint32_t service_to_print =0;//= 0x0217 ; // use this to allow to this service id only, or 0 for all services - // warning. Numbers should be SERVICE IDS (see serialiser/rsserviceids.h) +static const RsPeerId peer_to_print = RsPeerId(std::string("")) ; +static const RsGxsGroupId group_id_to_print = RsGxsGroupId(std::string("" )) ; // use this to allow to this group id only, or "" for all IDs +static const uint32_t service_to_print = 0 ; // use this to allow to this service id only, or 0 for all services + // warning. Numbers should be SERVICE IDS (see serialiser/rsserviceids.h. E.g. 0x0215 for forums) class nullstream: public std::ostream {}; +#if defined(NXS_NET_DEBUG_0) || defined(NXS_NET_DEBUG_1) || defined(NXS_NET_DEBUG_2) || defined(NXS_NET_DEBUG_3) || defined(NXS_NET_DEBUG_4) || defined(NXS_NET_DEBUG_5) +static std::string nice_time_stamp(time_t now,time_t TS) +{ + if(TS == 0) + return "Never" ; + else + { + std::ostringstream s; + s << now - TS << " secs ago" ; + return s.str() ; + } +} +#endif + static std::ostream& gxsnetdebug(const RsPeerId& peer_id,const RsGxsGroupId& grp_id,uint32_t service_type) { static nullstream null ; @@ -269,7 +284,7 @@ RsGxsNetService::RsGxsNetService(uint16_t servType, RsGeneralDataService *gds, : p3ThreadedService(), p3Config(), mTransactionN(0), mObserver(nxsObs), mDataStore(gds), mServType(servType), mTransactionTimeOut(TRANSAC_TIMEOUT), mNetMgr(netMgr), mNxsMutex("RsGxsNetService"), - mSyncTs(0), mLastKeyPublishTs(0), mSYNC_PERIOD(SYNC_PERIOD), mCircles(circles), mReputations(reputations), + mSyncTs(0), mLastKeyPublishTs(0),mLastCleanRejectedMessages(0), mSYNC_PERIOD(SYNC_PERIOD), mCircles(circles), mReputations(reputations), mPgpUtils(pgpUtils), mGrpAutoSync(grpAutoSync),mAllowMsgSync(msgAutoSync), mGrpServerUpdateItem(NULL), mServiceInfo(serviceInfo) @@ -314,12 +329,51 @@ int RsGxsNetService::tick() if(now > 10 + mLastKeyPublishTs) { sharePublishKeysPending() ; + mLastKeyPublishTs = now ; } + if(now > 3600 + mLastCleanRejectedMessages) + { + sharePublishKeysPending() ; + + mLastCleanRejectedMessages = now ; + } + cleanRejectedMessages() ; return 1; } +void RsGxsNetService::rejectMessage(const RsGxsMessageId& msg_id) +{ + RS_STACK_MUTEX(mNxsMutex) ; + + mRejectedMessages[msg_id] = time(NULL) ; +} +void RsGxsNetService::cleanRejectedMessages() +{ + RS_STACK_MUTEX(mNxsMutex) ; + time_t now = time(NULL) ; + +#ifdef NXS_NET_DEBUG_0 + GXSNETDEBUG___ << "Cleaning rejected messages." << std::endl; +#endif + + for(std::map::iterator it(mRejectedMessages.begin());it!=mRejectedMessages.end();) + if(it->second + REJECTED_MESSAGE_RETRY_DELAY < now) + { +#ifdef NXS_NET_DEBUG_0 + GXSNETDEBUG___ << " message id " << it->first << " should be re-tried. removing from list..." << std::endl; +#endif + + std::map::iterator tmp = it ; + ++tmp ; + mRejectedMessages.erase(it) ; + it=tmp ; + } + else + ++it ; +} + // This class collects outgoing items due to the broadcast of Nxs messages. It computes // a probability that can be used to temper the broadcast of items so as to match the // residual bandwidth (difference between max allowed bandwidth and current outgoing rate. @@ -466,8 +520,8 @@ void RsGxsNetService::syncWithPeers() NxsBandwidthRecorder::recordEvent(mServType,grp) ; -#ifdef NXS_NET_DEBUG_0 - GXSNETDEBUG_P_(*sit) << " sending RsNxsSyncGrp (sending back to peer the timestamp of latest group change we know about him) to peer id: " << *sit << " ts=" << updateTS << std::endl; +#ifdef NXS_NET_DEBUG_5 + GXSNETDEBUG_P_(*sit) << "Service "<< std::hex << ((mServiceInfo.mServiceType >> 8)& 0xffff) << std::dec << " sending global group TS of peer id: " << *sit << " ts=" << nice_time_stamp(time(NULL),updateTS) << " (secs ago) to himself" << std::endl; #endif sendItem(grp); } @@ -564,8 +618,8 @@ void RsGxsNetService::syncWithPeers() if(RSRandom::random_f32() < sending_probability) { sendItem(msg); -#ifdef NXS_NET_DEBUG_0 - GXSNETDEBUG_PG(*sit,grpId) << " sending RsNxsSyncMsg req (last local update TS for group+peer) for grpId=" << grpId << " to peer " << *sit << ", last TS=" << std::dec<< time(NULL) - updateTS << " secs ago." << std::endl; +#ifdef NXS_NET_DEBUG_5 + GXSNETDEBUG_PG(*sit,grpId) << "Service "<< std::hex << ((mServiceInfo.mServiceType >> 8)& 0xffff) << std::dec << " sending global message TS of peer id: " << *sit << " ts=" << nice_time_stamp(time(NULL),updateTS) << " (secs ago) for group " << grpId << " to himself" << std::endl; #endif } else @@ -1395,23 +1449,9 @@ void RsGxsNetService::data_tick() processExplicitGroupRequests(); } -#if defined(NXS_NET_DEBUG_0) || defined(NXS_NET_DEBUG_1) || defined(NXS_NET_DEBUG_2) || defined(NXS_NET_DEBUG_3) || defined(NXS_NET_DEBUG_4) -static std::string nice_time_stamp(time_t now,time_t TS) -{ - if(TS == 0) - return "Never" ; - else - { - std::ostringstream s; - s << now - TS << " secs ago" ; - return s.str() ; - } -} -#endif - void RsGxsNetService::debugDump() { -#ifdef NXS_NET_DEBUG_1 +#ifdef NXS_NET_DEBUG_0 RS_STACK_MUTEX(mNxsMutex) ; time_t now = time(NULL) ; @@ -1441,6 +1481,8 @@ void RsGxsNetService::debugDump() for(std::map::const_iterator it2(it->second->msgUpdateInfos.begin());it2!=it->second->msgUpdateInfos.end();++it2) GXSNETDEBUG_PG(it->first,it2->first) << " group " << it2->first << " - last updated at peer (secs ago): " << nice_time_stamp(now,it2->second.time_stamp) << ". Message count=" << it2->second.message_count << std::endl; } + + GXSNETDEBUG___<< " List of rejected message ids: " << mRejectedMessages.size() << std::endl; #endif } @@ -2161,6 +2203,10 @@ void RsGxsNetService::locked_pushMsgTransactionFromList(std::list& r GXSNETDEBUG_P_(peerId) << " nelems = " << reqList.size() << std::endl; GXSNETDEBUG_P_(peerId) << " peerId = " << peerId << std::endl; GXSNETDEBUG_P_(peerId) << " transN = " << transN << std::endl; +#endif +#ifdef NXS_NET_DEBUG_5 + GXSNETDEBUG_P_ (peerId) << "Service " << std::hex << ((mServiceInfo.mServiceType >> 8)& 0xffff) << std::dec << " - sending message request to peer " + << peerId << " for " << reqList.size() << " messages" << std::endl; #endif RsNxsTransac* transac = new RsNxsTransac(mServType); transac->transactFlag = RsNxsTransac::FLAG_TYPE_MSG_LIST_REQ @@ -2361,6 +2407,15 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr) continue ; } + if(mRejectedMessages.find(msgId) != mRejectedMessages.end()) + { +#ifdef NXS_NET_DEBUG_1 + GXSNETDEBUG_PG(item->PeerId(),grpId) << ", message has been recently rejected. Not requesting message!" << std::endl; +#endif + continue ; + } + + if(mReputations->haveReputation(syncItem->authorId) || noAuthor) { GixsReputation rep; @@ -2445,10 +2500,9 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr) } else { - // The list to req is empty. That means we already have all messages that this peer can - // provide. So we can stamp the group from this peer to be up to date. -#warning we should use tr->mTransaction->updateTS instead of time(NULL) - locked_stampPeerGroupUpdateTime(pid,grpId,tr->mTransaction->updateTS,msgItemL.size()) ; + // The list to req is empty. That means we already have all messages that this peer can + // provide. So we can stamp the group from this peer to be up to date. + locked_stampPeerGroupUpdateTime(pid,grpId,tr->mTransaction->updateTS,msgItemL.size()) ; } if(grpMeta) delete grpMeta; @@ -2481,6 +2535,10 @@ void RsGxsNetService::locked_pushGrpTransactionFromList( std::list& GXSNETDEBUG_P_(peerId) << " nelems = " << reqList.size() << std::endl; GXSNETDEBUG_P_(peerId) << " peerId = " << peerId << std::endl; GXSNETDEBUG_P_(peerId) << " transN = " << transN << std::endl; +#endif +#ifdef NXS_NET_DEBUG_5 + GXSNETDEBUG_P_ (peerId) << "Service " << std::hex << ((mServiceInfo.mServiceType >> 8)& 0xffff) << std::dec << " - sending group request to peer " + << peerId << " for " << reqList.size() << " groups" << std::endl; #endif RsNxsTransac* transac = new RsNxsTransac(mServType); transac->transactFlag = RsNxsTransac::FLAG_TYPE_GRP_LIST_REQ @@ -2705,6 +2763,10 @@ void RsGxsNetService::locked_genSendGrpsTransaction(NxsTransaction* tr) if(mGrpServerUpdateItem) updateTS = mGrpServerUpdateItem->grpUpdateTS; +#ifdef NXS_NET_DEBUG_5 + GXSNETDEBUG_P_ (tr->mTransaction->PeerId()) << "Service " << std::hex << ((mServiceInfo.mServiceType >> 8)& 0xffff) << std::dec << " - sending global group TS " + << updateTS << " to peer " << tr->mTransaction->PeerId() << std::endl; +#endif RsNxsTransac* ntr = new RsNxsTransac(mServType); ntr->transactionNumber = transN; ntr->transactFlag = RsNxsTransac::FLAG_BEGIN_P1 | RsNxsTransac::FLAG_TYPE_GRPS; @@ -2903,6 +2965,10 @@ void RsGxsNetService::locked_genSendMsgsTransaction(NxsTransaction* tr) newTr->mTransaction->PeerId(mOwnId); newTr->mTimeOut = time(NULL) + mTransactionTimeOut; +#ifdef NXS_NET_DEBUG_5 + GXSNETDEBUG_PG (peerId,grpId) << "Service " << std::hex << ((mServiceInfo.mServiceType >> 8)& 0xffff) << std::dec << " - sending message update to peer " + << peerId << " for group " << grpId << " with TS=" << nice_time_stamp(time(NULL),updateTS) <<" (secs ago)" << std::endl; +#endif ntr->PeerId(tr->mTransaction->PeerId()); sendItem(ntr); @@ -2975,6 +3041,10 @@ void RsGxsNetService::locked_pushGrpRespFromList(std::list& respList tr->mTransaction->PeerId(mOwnId); tr->mTimeOut = time(NULL) + mTransactionTimeOut; // signal peer to prepare for transaction +#ifdef NXS_NET_DEBUG_5 + GXSNETDEBUG_P_ (peer) << "Service " << std::hex << ((mServiceInfo.mServiceType >> 8)& 0xffff) << std::dec << " - sending group response to peer " + << peer << " with " << respList.size() << " groups " << std::endl; +#endif sendItem(trItem); locked_addTransaction(tr); } @@ -3465,6 +3535,10 @@ void RsGxsNetService::locked_pushMsgRespFromList(std::list& itemL, c tr->mTransaction->PeerId(mOwnId); tr->mTimeOut = time(NULL) + mTransactionTimeOut; +#ifdef NXS_NET_DEBUG_5 + GXSNETDEBUG_P_ (sslId) << "Service " << std::hex << ((mServiceInfo.mServiceType >> 8)& 0xffff) << std::dec << " - sending messages response to peer " + << sslId << " with " << itemL.size() << " messages " << std::endl; +#endif // signal peer to prepare for transaction sendItem(trItem); diff --git a/libretroshare/src/gxs/rsgxsnetservice.h b/libretroshare/src/gxs/rsgxsnetservice.h index 44f53fda4..6aa23d9d8 100644 --- a/libretroshare/src/gxs/rsgxsnetservice.h +++ b/libretroshare/src/gxs/rsgxsnetservice.h @@ -151,6 +151,8 @@ public: */ virtual void subscribeStatusChanged(const RsGxsGroupId& id,bool subscribed) ; + virtual void rejectMessage(const RsGxsMessageId& msg_id) ; + /* p3Config methods */ public: @@ -438,6 +440,7 @@ private: void locked_stampPeerGroupUpdateTime(const RsPeerId& pid,const RsGxsGroupId& grpId,time_t tm,uint32_t n_messages) ; + void cleanRejectedMessages(); private: @@ -476,6 +479,7 @@ private: uint32_t mSyncTs; uint32_t mLastKeyPublishTs; + uint32_t mLastCleanRejectedMessages; const uint32_t mSYNC_PERIOD; int mUpdateCounter ; @@ -513,6 +517,8 @@ private: RsGxsServerGrpUpdateItem* mGrpServerUpdateItem; RsServiceInfo mServiceInfo; + std::map mRejectedMessages; + void debugDump(); }; diff --git a/libretroshare/src/gxs/rsnxs.h b/libretroshare/src/gxs/rsnxs.h index 8d53f2147..746d7649f 100644 --- a/libretroshare/src/gxs/rsnxs.h +++ b/libretroshare/src/gxs/rsnxs.h @@ -127,6 +127,13 @@ public: */ virtual int sharePublishKey(const RsGxsGroupId& grpId,const std::set& peers)=0 ; + /*! + * \brief rejectMessage + * Tells the network exchange service to not download this message again, at least for some time (maybe 24h or more) + * in order to avoid cluttering the network pipe with copied of this rejected message. + * \param msgId + */ + virtual void rejectMessage(const RsGxsMessageId& msgId) =0; }; #endif // RSGNP_H