added rejection list to gxsnetservice that is fed by calls from GenExchange system, to avoid infinitely re-downloading rejected messages due to bad reputation, bad signatures, missing ids, etc

This commit is contained in:
csoler 2015-12-17 23:08:02 -05:00
parent 55e66d090c
commit e8b881b2f1
4 changed files with 130 additions and 30 deletions

View File

@ -62,8 +62,8 @@ static const uint32_t INDEX_AUTHEN_ADMIN = 0x00000040; // admin key
//#define GEN_EXCH_DEBUG 1
#define MSG_CLEANUP_PERIOD 60*5 // 5 minutes
#define INTEGRITY_CHECK_PERIOD 60*30 // 30 minutes
#define MSG_CLEANUP_PERIOD 60*5 // 5 minutes
#define INTEGRITY_CHECK_PERIOD 60*30 // 30 minutes
RsGenExchange::RsGenExchange(RsGeneralDataService *gds, RsNetworkExchangeService *ns,
RsSerialType *serviceSerialiser, uint16_t servType, RsGixs* gixs,
@ -1536,6 +1536,9 @@ void RsGenExchange::notifyNewMessages(std::vector<RsNxsMsg *>& messages)
std::cerr << " message is already in pending validation list. dropping." << std::endl;
delete msg;
@ -2656,6 +2659,16 @@ void RsGenExchange::processRecvdMessages()
mTrackingClues.push_back(std::make_pair(msg->msgId,msg->PeerId())) ;
if(validateReturn == VALIDATE_FAIL)
// In this case, we notify the network exchange service not to DL the message again, at least not yet.
std::cerr << "Notifying the network service to not download this message again." << std::endl;
mNetService->rejectMessage(msg->msgId) ;

View File

@ -228,18 +228,33 @@
#define MAX_REQLIST_SIZE 20 // No more than 20 items per msg request list => creates smaller transactions that are less likely to be cancelled.
#define TRANSAC_TIMEOUT 2000 // In seconds. Has been increased to avoid epidemic transaction cancelling due to overloaded outqueues.
#define SECURITY_DELAY_TO_FORCE_CLIENT_REUPDATE 3600 // force re-update if there happens to be a large delay between our server side TS and the client side TS of friends
#define REJECTED_MESSAGE_RETRY_DELAY 24*3600 // re-try rejected messages every 24hrs. Most of the time this is because the peer's reputation has changed.
// Debug system to allow to print only for some IDs (group, Peer, etc)
#if defined(NXS_NET_DEBUG_0) || defined(NXS_NET_DEBUG_1) || defined(NXS_NET_DEBUG_2) || defined(NXS_NET_DEBUG_3) || defined(NXS_NET_DEBUG_4) || defined(NXS_NET_DEBUG_5)
static const RsPeerId peer_to_print ;
static const RsGxsGroupId group_id_to_print ; //= RsGxsGroupId(std::string("78a7480e7af4ae12303ec7fef2736745" )) ; // use this to allow to this group id only, or "" for all IDs
static const uint32_t service_to_print =0;//= 0x0217 ; // use this to allow to this service id only, or 0 for all services
// warning. Numbers should be SERVICE IDS (see serialiser/rsserviceids.h)
static const RsPeerId peer_to_print = RsPeerId(std::string("")) ;
static const RsGxsGroupId group_id_to_print = RsGxsGroupId(std::string("" )) ; // use this to allow to this group id only, or "" for all IDs
static const uint32_t service_to_print = 0 ; // use this to allow to this service id only, or 0 for all services
// warning. Numbers should be SERVICE IDS (see serialiser/rsserviceids.h. E.g. 0x0215 for forums)
class nullstream: public std::ostream {};
#if defined(NXS_NET_DEBUG_0) || defined(NXS_NET_DEBUG_1) || defined(NXS_NET_DEBUG_2) || defined(NXS_NET_DEBUG_3) || defined(NXS_NET_DEBUG_4) || defined(NXS_NET_DEBUG_5)
static std::string nice_time_stamp(time_t now,time_t TS)
if(TS == 0)
return "Never" ;
std::ostringstream s;
s << now - TS << " secs ago" ;
return s.str() ;
static std::ostream& gxsnetdebug(const RsPeerId& peer_id,const RsGxsGroupId& grp_id,uint32_t service_type)
static nullstream null ;
@ -269,7 +284,7 @@ RsGxsNetService::RsGxsNetService(uint16_t servType, RsGeneralDataService *gds,
: p3ThreadedService(), p3Config(), mTransactionN(0),
mObserver(nxsObs), mDataStore(gds), mServType(servType),
mTransactionTimeOut(TRANSAC_TIMEOUT), mNetMgr(netMgr), mNxsMutex("RsGxsNetService"),
mSyncTs(0), mLastKeyPublishTs(0), mSYNC_PERIOD(SYNC_PERIOD), mCircles(circles), mReputations(reputations),
mSyncTs(0), mLastKeyPublishTs(0),mLastCleanRejectedMessages(0), mSYNC_PERIOD(SYNC_PERIOD), mCircles(circles), mReputations(reputations),
mGrpAutoSync(grpAutoSync),mAllowMsgSync(msgAutoSync), mGrpServerUpdateItem(NULL),
@ -314,12 +329,51 @@ int RsGxsNetService::tick()
if(now > 10 + mLastKeyPublishTs)
sharePublishKeysPending() ;
mLastKeyPublishTs = now ;
if(now > 3600 + mLastCleanRejectedMessages)
sharePublishKeysPending() ;
mLastCleanRejectedMessages = now ;
cleanRejectedMessages() ;
return 1;
void RsGxsNetService::rejectMessage(const RsGxsMessageId& msg_id)
mRejectedMessages[msg_id] = time(NULL) ;
void RsGxsNetService::cleanRejectedMessages()
time_t now = time(NULL) ;
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG___ << "Cleaning rejected messages." << std::endl;
for(std::map<RsGxsMessageId,time_t>::iterator it(mRejectedMessages.begin());it!=mRejectedMessages.end();)
if(it->second + REJECTED_MESSAGE_RETRY_DELAY < now)
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG___ << " message id " << it->first << " should be re-tried. removing from list..." << std::endl;
std::map<RsGxsMessageId,time_t>::iterator tmp = it ;
++tmp ;
mRejectedMessages.erase(it) ;
it=tmp ;
++it ;
// This class collects outgoing items due to the broadcast of Nxs messages. It computes
// a probability that can be used to temper the broadcast of items so as to match the
// residual bandwidth (difference between max allowed bandwidth and current outgoing rate.
@ -466,8 +520,8 @@ void RsGxsNetService::syncWithPeers()
NxsBandwidthRecorder::recordEvent(mServType,grp) ;
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG_P_(*sit) << " sending RsNxsSyncGrp (sending back to peer the timestamp of latest group change we know about him) to peer id: " << *sit << " ts=" << updateTS << std::endl;
#ifdef NXS_NET_DEBUG_5
GXSNETDEBUG_P_(*sit) << "Service "<< std::hex << ((mServiceInfo.mServiceType >> 8)& 0xffff) << std::dec << " sending global group TS of peer id: " << *sit << " ts=" << nice_time_stamp(time(NULL),updateTS) << " (secs ago) to himself" << std::endl;
@ -564,8 +618,8 @@ void RsGxsNetService::syncWithPeers()
if(RSRandom::random_f32() < sending_probability)
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG_PG(*sit,grpId) << " sending RsNxsSyncMsg req (last local update TS for group+peer) for grpId=" << grpId << " to peer " << *sit << ", last TS=" << std::dec<< time(NULL) - updateTS << " secs ago." << std::endl;
#ifdef NXS_NET_DEBUG_5
GXSNETDEBUG_PG(*sit,grpId) << "Service "<< std::hex << ((mServiceInfo.mServiceType >> 8)& 0xffff) << std::dec << " sending global message TS of peer id: " << *sit << " ts=" << nice_time_stamp(time(NULL),updateTS) << " (secs ago) for group " << grpId << " to himself" << std::endl;
@ -1395,23 +1449,9 @@ void RsGxsNetService::data_tick()
#if defined(NXS_NET_DEBUG_0) || defined(NXS_NET_DEBUG_1) || defined(NXS_NET_DEBUG_2) || defined(NXS_NET_DEBUG_3) || defined(NXS_NET_DEBUG_4)
static std::string nice_time_stamp(time_t now,time_t TS)
if(TS == 0)
return "Never" ;
std::ostringstream s;
s << now - TS << " secs ago" ;
return s.str() ;
void RsGxsNetService::debugDump()
#ifdef NXS_NET_DEBUG_1
#ifdef NXS_NET_DEBUG_0
time_t now = time(NULL) ;
@ -1441,6 +1481,8 @@ void RsGxsNetService::debugDump()
for(std::map<RsGxsGroupId, RsGxsMsgUpdateItem::MsgUpdateInfo>::const_iterator it2(it->second->msgUpdateInfos.begin());it2!=it->second->msgUpdateInfos.end();++it2)
GXSNETDEBUG_PG(it->first,it2->first) << " group " << it2->first << " - last updated at peer (secs ago): " << nice_time_stamp(now,it2->second.time_stamp) << ". Message count=" << it2->second.message_count << std::endl;
GXSNETDEBUG___<< " List of rejected message ids: " << mRejectedMessages.size() << std::endl;
@ -2161,6 +2203,10 @@ void RsGxsNetService::locked_pushMsgTransactionFromList(std::list<RsNxsItem*>& r
GXSNETDEBUG_P_(peerId) << " nelems = " << reqList.size() << std::endl;
GXSNETDEBUG_P_(peerId) << " peerId = " << peerId << std::endl;
GXSNETDEBUG_P_(peerId) << " transN = " << transN << std::endl;
#ifdef NXS_NET_DEBUG_5
GXSNETDEBUG_P_ (peerId) << "Service " << std::hex << ((mServiceInfo.mServiceType >> 8)& 0xffff) << std::dec << " - sending message request to peer "
<< peerId << " for " << reqList.size() << " messages" << std::endl;
RsNxsTransac* transac = new RsNxsTransac(mServType);
transac->transactFlag = RsNxsTransac::FLAG_TYPE_MSG_LIST_REQ
@ -2361,6 +2407,15 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr)
continue ;
if(mRejectedMessages.find(msgId) != mRejectedMessages.end())
#ifdef NXS_NET_DEBUG_1
GXSNETDEBUG_PG(item->PeerId(),grpId) << ", message has been recently rejected. Not requesting message!" << std::endl;
continue ;
if(mReputations->haveReputation(syncItem->authorId) || noAuthor)
GixsReputation rep;
@ -2445,10 +2500,9 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr)
// The list to req is empty. That means we already have all messages that this peer can
// provide. So we can stamp the group from this peer to be up to date.
#warning we should use tr->mTransaction->updateTS instead of time(NULL)
locked_stampPeerGroupUpdateTime(pid,grpId,tr->mTransaction->updateTS,msgItemL.size()) ;
// The list to req is empty. That means we already have all messages that this peer can
// provide. So we can stamp the group from this peer to be up to date.
locked_stampPeerGroupUpdateTime(pid,grpId,tr->mTransaction->updateTS,msgItemL.size()) ;
delete grpMeta;
@ -2481,6 +2535,10 @@ void RsGxsNetService::locked_pushGrpTransactionFromList( std::list<RsNxsItem*>&
GXSNETDEBUG_P_(peerId) << " nelems = " << reqList.size() << std::endl;
GXSNETDEBUG_P_(peerId) << " peerId = " << peerId << std::endl;
GXSNETDEBUG_P_(peerId) << " transN = " << transN << std::endl;
#ifdef NXS_NET_DEBUG_5
GXSNETDEBUG_P_ (peerId) << "Service " << std::hex << ((mServiceInfo.mServiceType >> 8)& 0xffff) << std::dec << " - sending group request to peer "
<< peerId << " for " << reqList.size() << " groups" << std::endl;
RsNxsTransac* transac = new RsNxsTransac(mServType);
transac->transactFlag = RsNxsTransac::FLAG_TYPE_GRP_LIST_REQ
@ -2705,6 +2763,10 @@ void RsGxsNetService::locked_genSendGrpsTransaction(NxsTransaction* tr)
updateTS = mGrpServerUpdateItem->grpUpdateTS;
#ifdef NXS_NET_DEBUG_5
GXSNETDEBUG_P_ (tr->mTransaction->PeerId()) << "Service " << std::hex << ((mServiceInfo.mServiceType >> 8)& 0xffff) << std::dec << " - sending global group TS "
<< updateTS << " to peer " << tr->mTransaction->PeerId() << std::endl;
RsNxsTransac* ntr = new RsNxsTransac(mServType);
ntr->transactionNumber = transN;
ntr->transactFlag = RsNxsTransac::FLAG_BEGIN_P1 | RsNxsTransac::FLAG_TYPE_GRPS;
@ -2903,6 +2965,10 @@ void RsGxsNetService::locked_genSendMsgsTransaction(NxsTransaction* tr)
newTr->mTimeOut = time(NULL) + mTransactionTimeOut;
#ifdef NXS_NET_DEBUG_5
GXSNETDEBUG_PG (peerId,grpId) << "Service " << std::hex << ((mServiceInfo.mServiceType >> 8)& 0xffff) << std::dec << " - sending message update to peer "
<< peerId << " for group " << grpId << " with TS=" << nice_time_stamp(time(NULL),updateTS) <<" (secs ago)" << std::endl;
@ -2975,6 +3041,10 @@ void RsGxsNetService::locked_pushGrpRespFromList(std::list<RsNxsItem*>& respList
tr->mTimeOut = time(NULL) + mTransactionTimeOut;
// signal peer to prepare for transaction
#ifdef NXS_NET_DEBUG_5
GXSNETDEBUG_P_ (peer) << "Service " << std::hex << ((mServiceInfo.mServiceType >> 8)& 0xffff) << std::dec << " - sending group response to peer "
<< peer << " with " << respList.size() << " groups " << std::endl;
@ -3465,6 +3535,10 @@ void RsGxsNetService::locked_pushMsgRespFromList(std::list<RsNxsItem*>& itemL, c
tr->mTimeOut = time(NULL) + mTransactionTimeOut;
#ifdef NXS_NET_DEBUG_5
GXSNETDEBUG_P_ (sslId) << "Service " << std::hex << ((mServiceInfo.mServiceType >> 8)& 0xffff) << std::dec << " - sending messages response to peer "
<< sslId << " with " << itemL.size() << " messages " << std::endl;
// signal peer to prepare for transaction

View File

@ -151,6 +151,8 @@ public:
virtual void subscribeStatusChanged(const RsGxsGroupId& id,bool subscribed) ;
virtual void rejectMessage(const RsGxsMessageId& msg_id) ;
/* p3Config methods */
@ -438,6 +440,7 @@ private:
void locked_stampPeerGroupUpdateTime(const RsPeerId& pid,const RsGxsGroupId& grpId,time_t tm,uint32_t n_messages) ;
void cleanRejectedMessages();
@ -476,6 +479,7 @@ private:
uint32_t mSyncTs;
uint32_t mLastKeyPublishTs;
uint32_t mLastCleanRejectedMessages;
const uint32_t mSYNC_PERIOD;
int mUpdateCounter ;
@ -513,6 +517,8 @@ private:
RsGxsServerGrpUpdateItem* mGrpServerUpdateItem;
RsServiceInfo mServiceInfo;
std::map<RsGxsMessageId,time_t> mRejectedMessages;
void debugDump();

View File

@ -127,6 +127,13 @@ public:
virtual int sharePublishKey(const RsGxsGroupId& grpId,const std::set<RsPeerId>& peers)=0 ;
* \brief rejectMessage
* Tells the network exchange service to not download this message again, at least for some time (maybe 24h or more)
* in order to avoid cluttering the network pipe with copied of this rejected message.
* \param msgId
virtual void rejectMessage(const RsGxsMessageId& msgId) =0;
#endif // RSGNP_H