merged with upstream/master

This commit is contained in:
csoler 2016-01-13 20:52:55 -05:00
commit d2f56a5c53
2 changed files with 57 additions and 26 deletions

View File

@ -209,6 +209,7 @@
NXS_NET_DEBUG_5 summary of transactions (useful to just know what comes in/out)
NXS_NET_DEBUG_6
NXS_NET_DEBUG_7 encryption/decryption of transactions
***/
//#define NXS_NET_DEBUG_0 1
//#define NXS_NET_DEBUG_1 1
@ -251,6 +252,7 @@ class nullstream: public std::ostream {};
#if defined(NXS_NET_DEBUG_0) || defined(NXS_NET_DEBUG_1) || defined(NXS_NET_DEBUG_2) || defined(NXS_NET_DEBUG_3) \
|| defined(NXS_NET_DEBUG_4) || defined(NXS_NET_DEBUG_5) || defined(NXS_NET_DEBUG_6) || defined(NXS_NET_DEBUG_7)
static std::string nice_time_stamp(time_t now,time_t TS)
{
if(TS == 0)
@ -739,7 +741,7 @@ void RsGxsNetService::syncGrpStatistics()
#ifdef NXS_NET_DEBUG_6
GXSNETDEBUG__G(it->first) << " needs update. Randomly asking to some friends" << std::endl;
#endif
// randomly select 2 friends among the suppliers of this group
// randomly select GROUP_STATS_UPDATE_NB_PEERS friends among the suppliers of this group
uint32_t n = RSRandom::random_u32() % rec.suppliers.size() ;
@ -749,14 +751,15 @@ void RsGxsNetService::syncGrpStatistics()
for(uint32_t i=0;i<std::min(rec.suppliers.size(),(size_t)GROUP_STATS_UPDATE_NB_PEERS);++i)
{
// we started at a random position in the set, wrap around if the end is reached
if(rit == rec.suppliers.end())
rit = rec.suppliers.begin() ;
RsPeerId peer_id = *rit ;
++rit ;
if(online_peers.find(peer_id) != online_peers.end()) // check that the peer is online
{
if(rit == rec.suppliers.end())
rit = rec.suppliers.begin() ;
#ifdef NXS_NET_DEBUG_6
GXSNETDEBUG_PG(peer_id,it->first) << " asking friend " << peer_id << " for an update of stats for group " << it->first << std::endl;
#endif
@ -764,6 +767,7 @@ void RsGxsNetService::syncGrpStatistics()
RsNxsSyncGrpStatsItem *grs = new RsNxsSyncGrpStatsItem(mServType) ;
grs->request_type = RsNxsSyncGrpStatsItem::GROUP_INFO_TYPE_REQUEST ;
grs->grpId = it->first ;
grs->PeerId(peer_id) ;
@ -1131,8 +1135,7 @@ void RsGxsNetService::locked_createTransactionFromPending(GrpCircleIdRequestVett
#ifdef NXS_NET_DEBUG_1
GXSNETDEBUG_PG(grpPend->mPeerId,entry.mGroupId) << " Group Id: " << entry.mGroupId << " PASSED" << std::endl;
#endif
RsNxsSyncGrpItem* gItem = new
RsNxsSyncGrpItem(mServType);
RsNxsSyncGrpItem* gItem = new RsNxsSyncGrpItem(mServType);
gItem->flag = RsNxsSyncGrpItem::FLAG_RESPONSE;
gItem->grpId = entry.mGroupId;
gItem->publishTs = 0;
@ -1158,6 +1161,7 @@ void RsGxsNetService::locked_createTransactionFromPending(MsgCircleIdsRequestVet
std::list<RsNxsItem*> itemL;
uint32_t transN = locked_getTransactionId();
RsGxsGroupId grp_id ;
for(; vit != msgPend->mMsgs.end(); ++vit)
{
@ -1171,10 +1175,12 @@ void RsGxsNetService::locked_createTransactionFromPending(MsgCircleIdsRequestVet
mItem->PeerId(msgPend->mPeerId);
mItem->transactionNumber = transN;
itemL.push_back(mItem);
grp_id = msgPend->mGrpId ;
}
if(!itemL.empty())
locked_pushMsgRespFromList(itemL, msgPend->mPeerId, transN);
locked_pushMsgRespFromList(itemL, msgPend->mPeerId,grp_id, transN);
}
/*bool RsGxsNetService::locked_canReceive(const RsGxsGrpMetaData * const grpMeta
@ -2545,8 +2551,9 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr)
std::cerr << "(EE) stepping in part of the code (" << __PRETTY_FUNCTION__ << ") where we shouldn't. This is a bug." << std::endl;
#ifdef TO_REMOVE
locked_stampPeerGroupUpdateTime(pid,grpId,tr->mTransaction->updateTS,msgItemL.size()) ;
#endif
return ;
}
@ -2739,8 +2746,16 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr)
}
else
{
#ifdef NXS_NET_DEBUG_1
GXSNETDEBUG_PG(item->PeerId(),grpId) << " Request list is empty. Not doing anything. " << std::endl;
#endif
// The list to req is empty. That means we already have all messages that this peer can
// provide. So we can stamp the group from this peer to be up to date.
// Part of this is already achieved in two other places:
// - the GroupStats exchange system, which counts the messages at each peer. It could also supply TS for the messages, but it does not for the time being
// - client TS are updated when receiving messages
locked_stampPeerGroupUpdateTime(pid,grpId,tr->mTransaction->updateTS,msgItemL.size()) ;
}
}
@ -3026,8 +3041,6 @@ void RsGxsNetService::runVetting()
{
AuthorPending* ap = *vit;
// ap->accepted() looks into the reputation of the destination
if(ap->accepted() || ap->expired())
{
// add to transactions
@ -3099,6 +3112,7 @@ void RsGxsNetService::locked_genSendMsgsTransaction(NxsTransaction* tr)
GXSNETDEBUG_P_(tr->mTransaction->PeerId()) << "locked_genSendMsgsTransaction() Generating Msg data send fron TransN: " << tr->mTransaction->transactionNumber << std::endl;
#endif
<<<<<<< HEAD
// go groups requested in transaction tr
std::list<RsNxsItem*>::iterator lit = tr->mItems.begin();
@ -3176,7 +3190,6 @@ void RsGxsNetService::locked_genSendMsgsTransaction(NxsTransaction* tr)
#endif
}
}
if(newTr->mItems.empty()){
delete newTr;
return;
@ -3220,6 +3233,7 @@ void RsGxsNetService::locked_genSendMsgsTransaction(NxsTransaction* tr)
#ifdef NXS_NET_DEBUG_5
GXSNETDEBUG_PG (peerId,grpId) << "Service " << std::hex << ((mServiceInfo.mServiceType >> 8)& 0xffff) << std::dec << " - sending message update to peer " << peerId << " for group " << grpId << " with TS=" << nice_time_stamp(time(NULL),updateTS) <<" (secs ago)" << std::endl;
#endif
ntr->PeerId(tr->mTransaction->PeerId());
sendItem(ntr);
@ -3244,10 +3258,17 @@ bool RsGxsNetService::locked_addTransaction(NxsTransaction* tr)
if(transNumExist)
{
#ifdef NXS_NET_DEBUG
std::cerr << "locked_addTransaction() " << std::endl;
std::cerr << "Transaction number exist already, transN: " << transN << std::endl;
GXSNETDEBUG_P_(peer) << "locked_addTransaction() " << std::endl;
GXSNETDEBUG_P_(peer) << "Transaction number exist already, transN: " << transN << std::endl;
#endif
return false;
}else{
#ifdef NXS_NET_DEBUG_1
GXSNETDEBUG_P_(peer) << "locked_addTransaction() " << std::endl;
GXSNETDEBUG_P_(peer) << "Added transaction number " << transN << std::endl;
#endif
transMap[transN] = tr;
return true;
}
#ifdef NXS_NET_DEBUG
@ -3499,6 +3520,7 @@ bool RsGxsNetService::decryptTransaction(NxsTransaction *tr)
tr->mItems = decrypted_items ;
return true ;
}
void RsGxsNetService::cleanTransactionItems(NxsTransaction* tr) const
@ -3972,8 +3994,7 @@ void RsGxsNetService::handleRecvSyncMessage(RsNxsSyncMsgReqItem *item)
{
RsGxsMsgMetaData* m = *vit;
RsNxsSyncMsgItem* mItem = new
RsNxsSyncMsgItem(mServType);
RsNxsSyncMsgItem* mItem = new RsNxsSyncMsgItem(mServType);
mItem->flag = RsNxsSyncGrpItem::FLAG_RESPONSE;
mItem->grpId = m->mGroupId;
mItem->msgId = m->mMsgId;
@ -3991,7 +4012,7 @@ void RsGxsNetService::handleRecvSyncMessage(RsNxsSyncMsgReqItem *item)
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG_PG(item->PeerId(),item->grpId) << " sending final msg info list of " << itemL.size() << " items." << std::endl;
#endif
locked_pushMsgRespFromList(itemL, peer, transN);
locked_pushMsgRespFromList(itemL, peer, item->grpId,transN);
}
}
#ifdef NXS_NET_DEBUG_0
@ -4005,17 +4026,18 @@ void RsGxsNetService::handleRecvSyncMessage(RsNxsSyncMsgReqItem *item)
delete *vit;
}
void RsGxsNetService::locked_pushMsgRespFromList(std::list<RsNxsItem*>& itemL, const RsPeerId& sslId, const uint32_t& transN)
void RsGxsNetService::locked_pushMsgRespFromList(std::list<RsNxsItem*>& itemL, const RsPeerId& sslId, const RsGxsGroupId& grp_id,const uint32_t& transN)
{
#ifdef NXS_NET_DEBUG_1
GXSNETDEBUG_P_(sslId) << "locked_pushMsgResponseFromList()" << std::endl;
GXSNETDEBUG_P_(sslId) << " nelems = " << itemL.size() << std::endl;
GXSNETDEBUG_P_(sslId) << " peerId = " << sslId << std::endl;
GXSNETDEBUG_P_(sslId) << " transN = " << transN << std::endl;
GXSNETDEBUG_PG(sslId,grp_id) << "locked_pushMsgResponseFromList()" << std::endl;
GXSNETDEBUG_PG(sslId,grp_id) << " nelems = " << itemL.size() << std::endl;
GXSNETDEBUG_PG(sslId,grp_id) << " peerId = " << sslId << std::endl;
GXSNETDEBUG_PG(sslId,grp_id) << " transN = " << transN << std::endl;
#endif
NxsTransaction* tr = new NxsTransaction();
tr->mItems = itemL;
tr->mFlag = NxsTransaction::FLAG_STATE_WAITING_CONFIRM;
RsNxsTransacItem* trItem = new RsNxsTransacItem(mServType);
trItem->transactFlag = RsNxsTransacItem::FLAG_BEGIN_P1 | RsNxsTransacItem::FLAG_TYPE_MSG_LIST_RESP;
trItem->nItems = itemL.size();
@ -4028,6 +4050,18 @@ void RsGxsNetService::locked_pushMsgRespFromList(std::list<RsNxsItem*>& itemL, c
tr->mTransaction->PeerId(mOwnId);
tr->mTimeOut = time(NULL) + mTransactionTimeOut;
ServerMsgMap::const_iterator cit = mServerMsgUpdateMap.find(grp_id);
// This time stamp is not supposed to be used on the other side. We just set it to avoid sending an uninitialiszed value.
if(cit != mServerMsgUpdateMap.end())
trItem->updateTS = cit->second->msgUpdateTS;
else
{
std::cerr << "(EE) cannot find a server TS for message of group " << grp_id << " in locked_pushMsgRespFromList. This is weird." << std::endl;
trItem->updateTS = 0 ;
}
#ifdef NXS_NET_DEBUG_5
GXSNETDEBUG_P_ (sslId) << "Service " << std::hex << ((mServiceInfo.mServiceType >> 8)& 0xffff) << std::dec << " - sending messages response to peer "
<< sslId << " with " << itemL.size() << " messages " << std::endl;
@ -4393,5 +4427,3 @@ void RsGxsNetService::handleRecvPublishKeys(RsNxsGroupPublishKeyItem *item)
std::cerr << "(EE) could not update database. Something went wrong." << std::endl;
}
}

View File

@ -51,7 +51,7 @@ class PgpAuxUtils;
class RsGroupNetworkStatsRecord
{
public:
RsGroupNetworkStatsRecord() { max_visible_count= 0 ; }
RsGroupNetworkStatsRecord() { max_visible_count= 0 ; update_TS=0; }
std::set<RsPeerId> suppliers ;
uint32_t max_visible_count ;
@ -364,7 +364,7 @@ private:
void locked_pushMsgTransactionFromList(std::list<RsNxsItem*>& reqList, const RsPeerId& peerId, const uint32_t& transN);
void locked_pushGrpTransactionFromList(std::list<RsNxsItem*>& reqList, const RsPeerId& peerId, const uint32_t& transN);
void locked_pushGrpRespFromList(std::list<RsNxsItem*>& respList, const RsPeerId& peer, const uint32_t& transN);
void locked_pushMsgRespFromList(std::list<RsNxsItem*>& itemL, const RsPeerId& sslId, const uint32_t& transN);
void locked_pushMsgRespFromList(std::list<RsNxsItem*>& itemL, const RsPeerId& sslId, const RsGxsGroupId &grp_id, const uint32_t& transN);
void syncWithPeers();
void syncGrpStatistics();
void addGroupItemToList(NxsTransaction*& tr,
@ -522,7 +522,6 @@ public:
typedef std::map<RsPeerId, RsGxsMsgUpdateItem*> ClientMsgMap;
typedef std::map<RsGxsGroupId, RsGxsServerMsgUpdateItem*> ServerMsgMap;
typedef std::map<RsPeerId, RsGxsGrpUpdateItem*> ClientGrpMap;
private:
ClientMsgMap mClientMsgUpdateMap;