Merge pull request from csoler/v0.6-GxsDebug

added rejection list to gxsnetservice that is fed by calls from GenEx…
This commit is contained in:
Cyril Soler 2015-12-17 23:10:02 -05:00
commit 374aa65c66
4 changed files with 130 additions and 30 deletions

@ -1536,6 +1536,9 @@ void RsGenExchange::notifyNewMessages(std::vector<RsNxsMsg *>& messages)
} }
else else
{ {
#ifdef GEN_EXCH_DEBUG
std::cerr << " message is already in pending validation list. dropping." << std::endl;
#endif
delete msg; delete msg;
} }
} }
@ -2656,6 +2659,16 @@ void RsGenExchange::processRecvdMessages()
if(grpMeta->mSignFlags & GXS_SERV::FLAG_AUTHOR_AUTHENTICATION_TRACK_MESSAGES) if(grpMeta->mSignFlags & GXS_SERV::FLAG_AUTHOR_AUTHENTICATION_TRACK_MESSAGES)
mTrackingClues.push_back(std::make_pair(msg->msgId,msg->PeerId())) ; mTrackingClues.push_back(std::make_pair(msg->msgId,msg->PeerId())) ;
} }
if(validateReturn == VALIDATE_FAIL)
{
// In this case, we notify the network exchange service not to DL the message again, at least not yet.
#ifdef GEN_EXCH_DEBUG
std::cerr << "Notifying the network service to not download this message again." << std::endl;
#endif
mNetService->rejectMessage(msg->msgId) ;
}
} }
else else
{ {

@ -228,18 +228,33 @@
#define MAX_REQLIST_SIZE 20 // No more than 20 items per msg request list => creates smaller transactions that are less likely to be cancelled. #define MAX_REQLIST_SIZE 20 // No more than 20 items per msg request list => creates smaller transactions that are less likely to be cancelled.
#define TRANSAC_TIMEOUT 2000 // In seconds. Has been increased to avoid epidemic transaction cancelling due to overloaded outqueues. #define TRANSAC_TIMEOUT 2000 // In seconds. Has been increased to avoid epidemic transaction cancelling due to overloaded outqueues.
#define SECURITY_DELAY_TO_FORCE_CLIENT_REUPDATE 3600 // force re-update if there happens to be a large delay between our server side TS and the client side TS of friends #define SECURITY_DELAY_TO_FORCE_CLIENT_REUPDATE 3600 // force re-update if there happens to be a large delay between our server side TS and the client side TS of friends
#define REJECTED_MESSAGE_RETRY_DELAY 24*3600 // re-try rejected messages every 24hrs. Most of the time this is because the peer's reputation has changed.
// Debug system to allow to print only for some IDs (group, Peer, etc) // Debug system to allow to print only for some IDs (group, Peer, etc)
#if defined(NXS_NET_DEBUG_0) || defined(NXS_NET_DEBUG_1) || defined(NXS_NET_DEBUG_2) || defined(NXS_NET_DEBUG_3) || defined(NXS_NET_DEBUG_4) || defined(NXS_NET_DEBUG_5) #if defined(NXS_NET_DEBUG_0) || defined(NXS_NET_DEBUG_1) || defined(NXS_NET_DEBUG_2) || defined(NXS_NET_DEBUG_3) || defined(NXS_NET_DEBUG_4) || defined(NXS_NET_DEBUG_5)
static const RsPeerId peer_to_print ; static const RsPeerId peer_to_print = RsPeerId(std::string("")) ;
static const RsGxsGroupId group_id_to_print ; //= RsGxsGroupId(std::string("78a7480e7af4ae12303ec7fef2736745" )) ; // use this to allow to this group id only, or "" for all IDs static const RsGxsGroupId group_id_to_print = RsGxsGroupId(std::string("" )) ; // use this to allow to this group id only, or "" for all IDs
static const uint32_t service_to_print =0;//= 0x0217 ; // use this to allow to this service id only, or 0 for all services static const uint32_t service_to_print = 0 ; // use this to allow to this service id only, or 0 for all services
// warning. Numbers should be SERVICE IDS (see serialiser/rsserviceids.h) // warning. Numbers should be SERVICE IDS (see serialiser/rsserviceids.h. E.g. 0x0215 for forums)
class nullstream: public std::ostream {}; class nullstream: public std::ostream {};
#if defined(NXS_NET_DEBUG_0) || defined(NXS_NET_DEBUG_1) || defined(NXS_NET_DEBUG_2) || defined(NXS_NET_DEBUG_3) || defined(NXS_NET_DEBUG_4) || defined(NXS_NET_DEBUG_5)
static std::string nice_time_stamp(time_t now,time_t TS)
{
if(TS == 0)
return "Never" ;
else
{
std::ostringstream s;
s << now - TS << " secs ago" ;
return s.str() ;
}
}
#endif
static std::ostream& gxsnetdebug(const RsPeerId& peer_id,const RsGxsGroupId& grp_id,uint32_t service_type) static std::ostream& gxsnetdebug(const RsPeerId& peer_id,const RsGxsGroupId& grp_id,uint32_t service_type)
{ {
static nullstream null ; static nullstream null ;
@ -269,7 +284,7 @@ RsGxsNetService::RsGxsNetService(uint16_t servType, RsGeneralDataService *gds,
: p3ThreadedService(), p3Config(), mTransactionN(0), : p3ThreadedService(), p3Config(), mTransactionN(0),
mObserver(nxsObs), mDataStore(gds), mServType(servType), mObserver(nxsObs), mDataStore(gds), mServType(servType),
mTransactionTimeOut(TRANSAC_TIMEOUT), mNetMgr(netMgr), mNxsMutex("RsGxsNetService"), mTransactionTimeOut(TRANSAC_TIMEOUT), mNetMgr(netMgr), mNxsMutex("RsGxsNetService"),
mSyncTs(0), mLastKeyPublishTs(0), mSYNC_PERIOD(SYNC_PERIOD), mCircles(circles), mReputations(reputations), mSyncTs(0), mLastKeyPublishTs(0),mLastCleanRejectedMessages(0), mSYNC_PERIOD(SYNC_PERIOD), mCircles(circles), mReputations(reputations),
mPgpUtils(pgpUtils), mPgpUtils(pgpUtils),
mGrpAutoSync(grpAutoSync),mAllowMsgSync(msgAutoSync), mGrpServerUpdateItem(NULL), mGrpAutoSync(grpAutoSync),mAllowMsgSync(msgAutoSync), mGrpServerUpdateItem(NULL),
mServiceInfo(serviceInfo) mServiceInfo(serviceInfo)
@ -314,12 +329,51 @@ int RsGxsNetService::tick()
if(now > 10 + mLastKeyPublishTs) if(now > 10 + mLastKeyPublishTs)
{ {
sharePublishKeysPending() ; sharePublishKeysPending() ;
mLastKeyPublishTs = now ; mLastKeyPublishTs = now ;
} }
if(now > 3600 + mLastCleanRejectedMessages)
{
sharePublishKeysPending() ;
mLastCleanRejectedMessages = now ;
}
cleanRejectedMessages() ;
return 1; return 1;
} }
void RsGxsNetService::rejectMessage(const RsGxsMessageId& msg_id)
{
RS_STACK_MUTEX(mNxsMutex) ;
mRejectedMessages[msg_id] = time(NULL) ;
}
void RsGxsNetService::cleanRejectedMessages()
{
RS_STACK_MUTEX(mNxsMutex) ;
time_t now = time(NULL) ;
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG___ << "Cleaning rejected messages." << std::endl;
#endif
for(std::map<RsGxsMessageId,time_t>::iterator it(mRejectedMessages.begin());it!=mRejectedMessages.end();)
if(it->second + REJECTED_MESSAGE_RETRY_DELAY < now)
{
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG___ << " message id " << it->first << " should be re-tried. removing from list..." << std::endl;
#endif
std::map<RsGxsMessageId,time_t>::iterator tmp = it ;
++tmp ;
mRejectedMessages.erase(it) ;
it=tmp ;
}
else
++it ;
}
// This class collects outgoing items due to the broadcast of Nxs messages. It computes // This class collects outgoing items due to the broadcast of Nxs messages. It computes
// a probability that can be used to temper the broadcast of items so as to match the // a probability that can be used to temper the broadcast of items so as to match the
// residual bandwidth (difference between max allowed bandwidth and current outgoing rate. // residual bandwidth (difference between max allowed bandwidth and current outgoing rate.
@ -466,8 +520,8 @@ void RsGxsNetService::syncWithPeers()
NxsBandwidthRecorder::recordEvent(mServType,grp) ; NxsBandwidthRecorder::recordEvent(mServType,grp) ;
#ifdef NXS_NET_DEBUG_0 #ifdef NXS_NET_DEBUG_5
GXSNETDEBUG_P_(*sit) << " sending RsNxsSyncGrp (sending back to peer the timestamp of latest group change we know about him) to peer id: " << *sit << " ts=" << updateTS << std::endl; GXSNETDEBUG_P_(*sit) << "Service "<< std::hex << ((mServiceInfo.mServiceType >> 8)& 0xffff) << std::dec << " sending global group TS of peer id: " << *sit << " ts=" << nice_time_stamp(time(NULL),updateTS) << " (secs ago) to himself" << std::endl;
#endif #endif
sendItem(grp); sendItem(grp);
} }
@ -564,8 +618,8 @@ void RsGxsNetService::syncWithPeers()
if(RSRandom::random_f32() < sending_probability) if(RSRandom::random_f32() < sending_probability)
{ {
sendItem(msg); sendItem(msg);
#ifdef NXS_NET_DEBUG_0 #ifdef NXS_NET_DEBUG_5
GXSNETDEBUG_PG(*sit,grpId) << " sending RsNxsSyncMsg req (last local update TS for group+peer) for grpId=" << grpId << " to peer " << *sit << ", last TS=" << std::dec<< time(NULL) - updateTS << " secs ago." << std::endl; GXSNETDEBUG_PG(*sit,grpId) << "Service "<< std::hex << ((mServiceInfo.mServiceType >> 8)& 0xffff) << std::dec << " sending global message TS of peer id: " << *sit << " ts=" << nice_time_stamp(time(NULL),updateTS) << " (secs ago) for group " << grpId << " to himself" << std::endl;
#endif #endif
} }
else else
@ -1395,23 +1449,9 @@ void RsGxsNetService::data_tick()
processExplicitGroupRequests(); processExplicitGroupRequests();
} }
#if defined(NXS_NET_DEBUG_0) || defined(NXS_NET_DEBUG_1) || defined(NXS_NET_DEBUG_2) || defined(NXS_NET_DEBUG_3) || defined(NXS_NET_DEBUG_4)
static std::string nice_time_stamp(time_t now,time_t TS)
{
if(TS == 0)
return "Never" ;
else
{
std::ostringstream s;
s << now - TS << " secs ago" ;
return s.str() ;
}
}
#endif
void RsGxsNetService::debugDump() void RsGxsNetService::debugDump()
{ {
#ifdef NXS_NET_DEBUG_1 #ifdef NXS_NET_DEBUG_0
RS_STACK_MUTEX(mNxsMutex) ; RS_STACK_MUTEX(mNxsMutex) ;
time_t now = time(NULL) ; time_t now = time(NULL) ;
@ -1441,6 +1481,8 @@ void RsGxsNetService::debugDump()
for(std::map<RsGxsGroupId, RsGxsMsgUpdateItem::MsgUpdateInfo>::const_iterator it2(it->second->msgUpdateInfos.begin());it2!=it->second->msgUpdateInfos.end();++it2) for(std::map<RsGxsGroupId, RsGxsMsgUpdateItem::MsgUpdateInfo>::const_iterator it2(it->second->msgUpdateInfos.begin());it2!=it->second->msgUpdateInfos.end();++it2)
GXSNETDEBUG_PG(it->first,it2->first) << " group " << it2->first << " - last updated at peer (secs ago): " << nice_time_stamp(now,it2->second.time_stamp) << ". Message count=" << it2->second.message_count << std::endl; GXSNETDEBUG_PG(it->first,it2->first) << " group " << it2->first << " - last updated at peer (secs ago): " << nice_time_stamp(now,it2->second.time_stamp) << ". Message count=" << it2->second.message_count << std::endl;
} }
GXSNETDEBUG___<< " List of rejected message ids: " << mRejectedMessages.size() << std::endl;
#endif #endif
} }
@ -2161,6 +2203,10 @@ void RsGxsNetService::locked_pushMsgTransactionFromList(std::list<RsNxsItem*>& r
GXSNETDEBUG_P_(peerId) << " nelems = " << reqList.size() << std::endl; GXSNETDEBUG_P_(peerId) << " nelems = " << reqList.size() << std::endl;
GXSNETDEBUG_P_(peerId) << " peerId = " << peerId << std::endl; GXSNETDEBUG_P_(peerId) << " peerId = " << peerId << std::endl;
GXSNETDEBUG_P_(peerId) << " transN = " << transN << std::endl; GXSNETDEBUG_P_(peerId) << " transN = " << transN << std::endl;
#endif
#ifdef NXS_NET_DEBUG_5
GXSNETDEBUG_P_ (peerId) << "Service " << std::hex << ((mServiceInfo.mServiceType >> 8)& 0xffff) << std::dec << " - sending message request to peer "
<< peerId << " for " << reqList.size() << " messages" << std::endl;
#endif #endif
RsNxsTransac* transac = new RsNxsTransac(mServType); RsNxsTransac* transac = new RsNxsTransac(mServType);
transac->transactFlag = RsNxsTransac::FLAG_TYPE_MSG_LIST_REQ transac->transactFlag = RsNxsTransac::FLAG_TYPE_MSG_LIST_REQ
@ -2361,6 +2407,15 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr)
continue ; continue ;
} }
if(mRejectedMessages.find(msgId) != mRejectedMessages.end())
{
#ifdef NXS_NET_DEBUG_1
GXSNETDEBUG_PG(item->PeerId(),grpId) << ", message has been recently rejected. Not requesting message!" << std::endl;
#endif
continue ;
}
if(mReputations->haveReputation(syncItem->authorId) || noAuthor) if(mReputations->haveReputation(syncItem->authorId) || noAuthor)
{ {
GixsReputation rep; GixsReputation rep;
@ -2447,7 +2502,6 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr)
{ {
// The list to req is empty. That means we already have all messages that this peer can // The list to req is empty. That means we already have all messages that this peer can
// provide. So we can stamp the group from this peer to be up to date. // provide. So we can stamp the group from this peer to be up to date.
#warning we should use tr->mTransaction->updateTS instead of time(NULL)
locked_stampPeerGroupUpdateTime(pid,grpId,tr->mTransaction->updateTS,msgItemL.size()) ; locked_stampPeerGroupUpdateTime(pid,grpId,tr->mTransaction->updateTS,msgItemL.size()) ;
} }
if(grpMeta) if(grpMeta)
@ -2481,6 +2535,10 @@ void RsGxsNetService::locked_pushGrpTransactionFromList( std::list<RsNxsItem*>&
GXSNETDEBUG_P_(peerId) << " nelems = " << reqList.size() << std::endl; GXSNETDEBUG_P_(peerId) << " nelems = " << reqList.size() << std::endl;
GXSNETDEBUG_P_(peerId) << " peerId = " << peerId << std::endl; GXSNETDEBUG_P_(peerId) << " peerId = " << peerId << std::endl;
GXSNETDEBUG_P_(peerId) << " transN = " << transN << std::endl; GXSNETDEBUG_P_(peerId) << " transN = " << transN << std::endl;
#endif
#ifdef NXS_NET_DEBUG_5
GXSNETDEBUG_P_ (peerId) << "Service " << std::hex << ((mServiceInfo.mServiceType >> 8)& 0xffff) << std::dec << " - sending group request to peer "
<< peerId << " for " << reqList.size() << " groups" << std::endl;
#endif #endif
RsNxsTransac* transac = new RsNxsTransac(mServType); RsNxsTransac* transac = new RsNxsTransac(mServType);
transac->transactFlag = RsNxsTransac::FLAG_TYPE_GRP_LIST_REQ transac->transactFlag = RsNxsTransac::FLAG_TYPE_GRP_LIST_REQ
@ -2705,6 +2763,10 @@ void RsGxsNetService::locked_genSendGrpsTransaction(NxsTransaction* tr)
if(mGrpServerUpdateItem) if(mGrpServerUpdateItem)
updateTS = mGrpServerUpdateItem->grpUpdateTS; updateTS = mGrpServerUpdateItem->grpUpdateTS;
#ifdef NXS_NET_DEBUG_5
GXSNETDEBUG_P_ (tr->mTransaction->PeerId()) << "Service " << std::hex << ((mServiceInfo.mServiceType >> 8)& 0xffff) << std::dec << " - sending global group TS "
<< updateTS << " to peer " << tr->mTransaction->PeerId() << std::endl;
#endif
RsNxsTransac* ntr = new RsNxsTransac(mServType); RsNxsTransac* ntr = new RsNxsTransac(mServType);
ntr->transactionNumber = transN; ntr->transactionNumber = transN;
ntr->transactFlag = RsNxsTransac::FLAG_BEGIN_P1 | RsNxsTransac::FLAG_TYPE_GRPS; ntr->transactFlag = RsNxsTransac::FLAG_BEGIN_P1 | RsNxsTransac::FLAG_TYPE_GRPS;
@ -2903,6 +2965,10 @@ void RsGxsNetService::locked_genSendMsgsTransaction(NxsTransaction* tr)
newTr->mTransaction->PeerId(mOwnId); newTr->mTransaction->PeerId(mOwnId);
newTr->mTimeOut = time(NULL) + mTransactionTimeOut; newTr->mTimeOut = time(NULL) + mTransactionTimeOut;
#ifdef NXS_NET_DEBUG_5
GXSNETDEBUG_PG (peerId,grpId) << "Service " << std::hex << ((mServiceInfo.mServiceType >> 8)& 0xffff) << std::dec << " - sending message update to peer "
<< peerId << " for group " << grpId << " with TS=" << nice_time_stamp(time(NULL),updateTS) <<" (secs ago)" << std::endl;
#endif
ntr->PeerId(tr->mTransaction->PeerId()); ntr->PeerId(tr->mTransaction->PeerId());
sendItem(ntr); sendItem(ntr);
@ -2975,6 +3041,10 @@ void RsGxsNetService::locked_pushGrpRespFromList(std::list<RsNxsItem*>& respList
tr->mTransaction->PeerId(mOwnId); tr->mTransaction->PeerId(mOwnId);
tr->mTimeOut = time(NULL) + mTransactionTimeOut; tr->mTimeOut = time(NULL) + mTransactionTimeOut;
// signal peer to prepare for transaction // signal peer to prepare for transaction
#ifdef NXS_NET_DEBUG_5
GXSNETDEBUG_P_ (peer) << "Service " << std::hex << ((mServiceInfo.mServiceType >> 8)& 0xffff) << std::dec << " - sending group response to peer "
<< peer << " with " << respList.size() << " groups " << std::endl;
#endif
sendItem(trItem); sendItem(trItem);
locked_addTransaction(tr); locked_addTransaction(tr);
} }
@ -3465,6 +3535,10 @@ void RsGxsNetService::locked_pushMsgRespFromList(std::list<RsNxsItem*>& itemL, c
tr->mTransaction->PeerId(mOwnId); tr->mTransaction->PeerId(mOwnId);
tr->mTimeOut = time(NULL) + mTransactionTimeOut; tr->mTimeOut = time(NULL) + mTransactionTimeOut;
#ifdef NXS_NET_DEBUG_5
GXSNETDEBUG_P_ (sslId) << "Service " << std::hex << ((mServiceInfo.mServiceType >> 8)& 0xffff) << std::dec << " - sending messages response to peer "
<< sslId << " with " << itemL.size() << " messages " << std::endl;
#endif
// signal peer to prepare for transaction // signal peer to prepare for transaction
sendItem(trItem); sendItem(trItem);

@ -151,6 +151,8 @@ public:
*/ */
virtual void subscribeStatusChanged(const RsGxsGroupId& id,bool subscribed) ; virtual void subscribeStatusChanged(const RsGxsGroupId& id,bool subscribed) ;
virtual void rejectMessage(const RsGxsMessageId& msg_id) ;
/* p3Config methods */ /* p3Config methods */
public: public:
@ -438,6 +440,7 @@ private:
void locked_stampPeerGroupUpdateTime(const RsPeerId& pid,const RsGxsGroupId& grpId,time_t tm,uint32_t n_messages) ; void locked_stampPeerGroupUpdateTime(const RsPeerId& pid,const RsGxsGroupId& grpId,time_t tm,uint32_t n_messages) ;
void cleanRejectedMessages();
private: private:
@ -476,6 +479,7 @@ private:
uint32_t mSyncTs; uint32_t mSyncTs;
uint32_t mLastKeyPublishTs; uint32_t mLastKeyPublishTs;
uint32_t mLastCleanRejectedMessages;
const uint32_t mSYNC_PERIOD; const uint32_t mSYNC_PERIOD;
int mUpdateCounter ; int mUpdateCounter ;
@ -513,6 +517,8 @@ private:
RsGxsServerGrpUpdateItem* mGrpServerUpdateItem; RsGxsServerGrpUpdateItem* mGrpServerUpdateItem;
RsServiceInfo mServiceInfo; RsServiceInfo mServiceInfo;
std::map<RsGxsMessageId,time_t> mRejectedMessages;
void debugDump(); void debugDump();
}; };

@ -127,6 +127,13 @@ public:
*/ */
virtual int sharePublishKey(const RsGxsGroupId& grpId,const std::set<RsPeerId>& peers)=0 ; virtual int sharePublishKey(const RsGxsGroupId& grpId,const std::set<RsPeerId>& peers)=0 ;
/*!
* \brief rejectMessage
* Tells the network exchange service to not download this message again, at least for some time (maybe 24h or more)
* in order to avoid cluttering the network pipe with copied of this rejected message.
* \param msgId
*/
virtual void rejectMessage(const RsGxsMessageId& msgId) =0;
}; };
#endif // RSGNP_H #endif // RSGNP_H