diff --git a/libretroshare/src/gxs/rsgxsnetservice.cc b/libretroshare/src/gxs/rsgxsnetservice.cc index 84ddb2e4e..47c0ba998 100644 --- a/libretroshare/src/gxs/rsgxsnetservice.cc +++ b/libretroshare/src/gxs/rsgxsnetservice.cc @@ -37,13 +37,23 @@ /*** * #define NXS_NET_DEBUG 1 ***/ +// #define NXS_NET_DEBUG 1 #define GIXS_CUT_OFF 0 -#define SYNC_PERIOD 60 // every 60 seconds (12 seconds for testing) -#define TRANSAC_TIMEOUT 30 // 30 seconds. Has been increased to avoid epidemic transaction cancelling due to overloaded outqueues. +// The constant below have a direct influence on how fast forums/channels/posted/identity groups propagate and on the overloading of queues: +// +// Channels/forums will update at a rate of SYNC_PERIOD*MAX_REQLIST_SIZE/60 messages per minute. +// A large TRANSAC_TIMEOUT helps large transactions to finish before anything happens (e.g. disconnexion) or when the server has low upload bandwidth, +// but also uses more memory. +// A small value for MAX_REQLIST_SIZE is likely to help messages to propagate in a chaotic network, but will also slow them down. +// A small SYNC_PERIOD fasten message propagation, but is likely to overload the server side of transactions (e.g. overload outqueues). +// +#define SYNC_PERIOD 60 +#define MAX_REQLIST_SIZE 20 // No more than 20 items per msg request list => creates smaller transactions that are less likely to be cancelled. +#define TRANSAC_TIMEOUT 2000 // In seconds. Has been increased to avoid epidemic transaction cancelling due to overloaded outqueues. - const uint32_t RsGxsNetService::FRAGMENT_SIZE = 150000; +const uint32_t RsGxsNetService::FRAGMENT_SIZE = 150000; RsGxsNetService::RsGxsNetService(uint16_t servType, RsGeneralDataService *gds, RsNxsNetMgr *netMgr, RsNxsObserver *nxsObs, @@ -202,17 +212,16 @@ private: static uint64_t total_record ; }; -uint32_t NxsBandwidthRecorder::total_events =0 ; // total number of events. Not used. +uint32_t NxsBandwidthRecorder::total_events =0 ; // total number of events. Not used. uint64_t NxsBandwidthRecorder::last_event_record = time(NULL) * 1000;// starting time of bw estimate period (in msec) -uint64_t NxsBandwidthRecorder::total_record =0 ; // total bytes recorded in the current time frame +uint64_t NxsBandwidthRecorder::total_record =0 ; // total bytes recorded in the current time frame float NxsBandwidthRecorder::estimated_required_bandwidth = 10.0f ;// Estimated BW for sending sync data. Set to 10KB/s, to avoid 0. -RsMutex NxsBandwidthRecorder::mtx("Bandwidth recorder") ; // Protects the recorder since bw events are collected from multiple GXS Net services +RsMutex NxsBandwidthRecorder::mtx("Bandwidth recorder") ; // Protects the recorder since bw events are collected from multiple GXS Net services void RsGxsNetService::syncWithPeers() { #ifdef NXS_NET_DEBUG - std::cerr << "RsGxsNetService::syncWithPeers()"; - std::cerr << std::endl; + std::cerr << "RsGxsNetService::syncWithPeers() this=" << (void*)this << std::endl; #endif static RsNxsSerialiser ser(mServType) ; // this is used to estimate bandwidth. @@ -225,9 +234,9 @@ void RsGxsNetService::syncWithPeers() std::set::iterator sit = peers.begin(); if(mGrpAutoSync) - { - // for now just grps - for(; sit != peers.end(); ++sit) + { + // for now just grps + for(; sit != peers.end(); ++sit) { const RsPeerId peerId = *sit; @@ -246,6 +255,9 @@ void RsGxsNetService::syncWithPeers() NxsBandwidthRecorder::recordEvent(mServType,grp) ; +#ifdef NXS_NET_DEBUG + std::cerr << " sending RsNxsSyncGrp item to peer id: " << *sit << " ts=" << updateTS << std::endl; +#endif sendItem(grp); } } @@ -279,7 +291,7 @@ void RsGxsNetService::syncWithPeers() float sending_probability = NxsBandwidthRecorder::computeCurrentSendingProbability() ; #ifdef NXS_NET_DEBUG - std::cerr << "syncWithPeers(): Sending probability = " << sending_probability << std::endl; + std::cerr << " syncWithPeers(): Sending probability = " << sending_probability << std::endl; #endif // synchronise group msg for groups which we're subscribed to @@ -298,40 +310,52 @@ void RsGxsNetService::syncWithPeers() { mui = cit->second; } +#ifdef NXS_NET_DEBUG + std::cerr << " syncing messages with peer " << peerId << std::endl; +#endif GrpMetaMap::const_iterator mmit = toRequest.begin(); for(; mmit != toRequest.end(); ++mmit) + { + const RsGxsGrpMetaData* meta = mmit->second; + const RsGxsGroupId& grpId = mmit->first; + + if(!checkCanRecvMsgFromPeer(peerId, *meta)) + continue; + + uint32_t updateTS = 0; + if(mui) { - const RsGxsGrpMetaData* meta = mmit->second; - const RsGxsGroupId& grpId = mmit->first; + std::map::const_iterator cit2 = mui->msgUpdateTS.find(grpId); - if(!checkCanRecvMsgFromPeer(peerId, *meta)) - continue; - - uint32_t updateTS = 0; - if(mui) + if(cit2 != mui->msgUpdateTS.end()) { - std::map::const_iterator cit2 = - mui->msgUpdateTS.find(grpId); - - if(cit2 != mui->msgUpdateTS.end()) - { - updateTS = cit2->second; - } + updateTS = cit2->second; } + } - RsNxsSyncMsg* msg = new RsNxsSyncMsg(mServType); - msg->clear(); - msg->PeerId(peerId); - msg->grpId = grpId; - msg->updateTS = updateTS; + RsNxsSyncMsg* msg = new RsNxsSyncMsg(mServType); + msg->clear(); + msg->PeerId(peerId); + msg->grpId = grpId; + msg->updateTS = updateTS; - NxsBandwidthRecorder::recordEvent(mServType,msg) ; + NxsBandwidthRecorder::recordEvent(mServType,msg) ; - if(RSRandom::random_f32() < sending_probability) - sendItem(msg); - else - delete msg ; + if(RSRandom::random_f32() < sending_probability) + { + sendItem(msg); +#ifdef NXS_NET_DEBUG + std::cerr << " sending RsNxsSyncMsg req for grpId=" << grpId << " to peer " << *sit << std::endl; +#endif + } + else + { + delete msg ; +#ifdef NXS_NET_DEBUG + std::cerr << " cancel RsNxsSyncMsg req for grpId=" << grpId << *sit << ": not enough bandwidth." << std::endl; +#endif + } } } @@ -486,15 +510,17 @@ RsNxsGrp* RsGxsNetService::deFragmentGrp(GrpFragments& grpFragments) const struct GrpFragCollate { - RsGxsGroupId mGrpId; + RsGxsGroupId mGrpId; GrpFragCollate(const RsGxsGroupId& grpId) : mGrpId(grpId){ } bool operator()(RsNxsGrp* grp) { return grp->grpId == mGrpId;} }; -void RsGxsNetService::locked_createTransactionFromPending( - MsgRespPending* msgPend) +void RsGxsNetService::locked_createTransactionFromPending( MsgRespPending* msgPend) { - MsgAuthorV::const_iterator cit = msgPend->mMsgAuthV.begin(); +#ifdef NXS_NET_DEBUG + std::cerr << "locked_createTransactionFromPending()" << std::endl; +#endif + MsgAuthorV::const_iterator cit = msgPend->mMsgAuthV.begin(); std::list reqList; uint32_t transN = locked_getTransactionId(); for(; cit != msgPend->mMsgAuthV.end(); ++cit) @@ -511,16 +537,25 @@ void RsGxsNetService::locked_createTransactionFromPending( msgItem->transactionNumber = transN; msgItem->PeerId(msgPend->mPeerId); reqList.push_back(msgItem); - } - } + } +#ifdef NXS_NET_DEBUG + else + std::cerr << " entry failed vetting: grpId=" << entry.mGrpId << ", msgId=" << entry.mMsgId << ", peerId=" << msgPend->mPeerId << std::endl; +#endif + } if(!reqList.empty()) locked_pushMsgTransactionFromList(reqList, msgPend->mPeerId, transN); +#ifdef NXS_NET_DEBUG + std::cerr << " added " << reqList.size() << " items to transaction." << std::endl; +#endif } -void RsGxsNetService::locked_createTransactionFromPending( - GrpRespPending* grpPend) +void RsGxsNetService::locked_createTransactionFromPending(GrpRespPending* grpPend) { +#ifdef NXS_NET_DEBUG + std::cerr << "locked_createTransactionFromPending() from peer " << grpPend->mPeerId << std::endl; +#endif GrpAuthorV::const_iterator cit = grpPend->mGrpAuthV.begin(); std::list reqList; uint32_t transN = locked_getTransactionId(); @@ -531,8 +566,7 @@ void RsGxsNetService::locked_createTransactionFromPending( if(entry.mPassedVetting) { #ifdef NXS_NET_DEBUG - std::cerr << "locked_createTransactionFromPending(AUTHOR VETTING) Group Id: " << entry.mGrpId << " PASSED"; - std::cerr << std::endl; + std::cerr << " entry Group Id: " << entry.mGrpId << " PASSED" << std::endl; #endif RsNxsSyncGrpItem* msgItem = new RsNxsSyncGrpItem(mServType); msgItem->grpId = entry.mGrpId; @@ -542,13 +576,10 @@ void RsGxsNetService::locked_createTransactionFromPending( msgItem->PeerId(grpPend->mPeerId); reqList.push_back(msgItem); } - else - { #ifdef NXS_NET_DEBUG - std::cerr << "locked_createTransactionFromPending(AUTHOR VETTING) Group Id: " << entry.mGrpId << " FAILED"; - std::cerr << std::endl; + else + std::cerr << " entry failed vetting: grpId=" << entry.mGrpId << ", peerId=" << grpPend->mPeerId << std::endl; #endif - } } if(!reqList.empty()) @@ -558,7 +589,10 @@ void RsGxsNetService::locked_createTransactionFromPending( void RsGxsNetService::locked_createTransactionFromPending(GrpCircleIdRequestVetting* grpPend) { - std::vector::iterator cit = grpPend->mGrpCircleV.begin(); +#ifdef NXS_NET_DEBUG + std::cerr << "locked_createTransactionFromPending(GrpCircleIdReq)" << std::endl; +#endif + std::vector::iterator cit = grpPend->mGrpCircleV.begin(); uint32_t transN = locked_getTransactionId(); std::list itemL; for(; cit != grpPend->mGrpCircleV.end(); ++cit) @@ -567,8 +601,7 @@ void RsGxsNetService::locked_createTransactionFromPending(GrpCircleIdRequestVett if(entry.mCleared) { #ifdef NXS_NET_DEBUG - std::cerr << "locked_createTransactionFromPending(CIRCLE VETTING) Group Id: " << entry.mGroupId << " PASSED"; - std::cerr << std::endl; + std::cerr << " Group Id: " << entry.mGroupId << " PASSED" << std::endl; #endif RsNxsSyncGrpItem* gItem = new RsNxsSyncGrpItem(mServType); @@ -581,14 +614,11 @@ void RsGxsNetService::locked_createTransactionFromPending(GrpCircleIdRequestVett // why it authorId not set here??? itemL.push_back(gItem); } - else - { #ifdef NXS_NET_DEBUG - std::cerr << "locked_createTransactionFromPending(CIRCLE VETTING) Group Id: " << entry.mGroupId << " FAILED"; - std::cerr << std::endl; + else + std::cerr << " Group Id: " << entry.mGroupId << " FAILED" << std::endl; #endif - } - } + } if(!itemL.empty()) locked_pushGrpRespFromList(itemL, grpPend->mPeerId, transN); @@ -750,10 +780,8 @@ class StoreHere { public: - StoreHere(RsGxsNetService::ClientGrpMap& cgm, RsGxsNetService::ClientMsgMap& cmm, - RsGxsNetService::ServerMsgMap& smm, - RsGxsServerGrpUpdateItem*& sgm) : mClientGrpMap(cgm), mClientMsgMap(cmm), - mServerMsgMap(smm), mServerGrpUpdateItem(sgm) + StoreHere(RsGxsNetService::ClientGrpMap& cgm, RsGxsNetService::ClientMsgMap& cmm, RsGxsNetService::ServerMsgMap& smm, RsGxsServerGrpUpdateItem*& sgm) + : mClientGrpMap(cgm), mClientMsgMap(cmm), mServerMsgMap(smm), mServerGrpUpdateItem(sgm) {} void operator() (RsItem* item) @@ -777,17 +805,12 @@ public: } else { -#ifdef NXS_NET_DEBUG std::cerr << "Error! More than one server group update item exists!" << std::endl; -#endif delete gsui; } } else - { std::cerr << "Type not expected!" << std::endl; - } - } private: @@ -801,9 +824,7 @@ private: bool RsGxsNetService::loadList(std::list &load) { - std::for_each(load.begin(), load.end(), StoreHere(mClientGrpUpdateMap, mClientMsgUpdateMap, - mServerMsgUpdateMap, mGrpServerUpdateItem)); - + std::for_each(load.begin(), load.end(), StoreHere(mClientGrpUpdateMap, mClientMsgUpdateMap, mServerMsgUpdateMap, mGrpServerUpdateItem)); return true; } @@ -847,46 +868,45 @@ RsSerialiser *RsGxsNetService::setupSerialiser() return rss; } -void RsGxsNetService::recvNxsItemQueue(){ - +void RsGxsNetService::recvNxsItemQueue() +{ RsItem *item ; while(NULL != (item=recvItem())) { #ifdef NXS_NET_DEBUG std::cerr << "RsGxsNetService Item:" << (void*)item << std::endl ; - item->print(std::cerr); + //item->print(std::cerr); #endif // RsNxsItem needs dynamic_cast, since they have derived siblings. // RsNxsItem *ni = dynamic_cast(item) ; if(ni != NULL) - { - // a live transaction has a non zero value - if(ni->transactionNumber != 0){ - + { + // a live transaction has a non zero value + if(ni->transactionNumber != 0) + { #ifdef NXS_NET_DEBUG - std::cerr << "recvNxsItemQueue()" << std::endl; - std::cerr << "handlingTransaction, transN" << ni->transactionNumber << std::endl; + std::cerr << " recvNxsItemQueue() handlingTransaction, transN " << ni->transactionNumber << std::endl; #endif if(!handleTransaction(ni)) delete ni; continue; - } + } - switch(ni->PacketSubType()) - { - case RS_PKT_SUBTYPE_NXS_SYNC_GRP: handleRecvSyncGroup (dynamic_cast(ni)) ; break ; - case RS_PKT_SUBTYPE_NXS_SYNC_MSG: handleRecvSyncMessage (dynamic_cast(ni)) ; break ; - case RS_PKT_SUBTYPE_NXS_GRP_PUBLISH_KEY: handleRecvPublishKeys (dynamic_cast(ni)) ; break ; - default: - std::cerr << "Unhandled item subtype " << (uint32_t) ni->PacketSubType() << " in RsGxsNetService: " << std::endl; break; - } - delete item ; - } + switch(ni->PacketSubType()) + { + case RS_PKT_SUBTYPE_NXS_SYNC_GRP: handleRecvSyncGroup (dynamic_cast(ni)) ; break ; + case RS_PKT_SUBTYPE_NXS_SYNC_MSG: handleRecvSyncMessage (dynamic_cast(ni)) ; break ; + case RS_PKT_SUBTYPE_NXS_GRP_PUBLISH_KEY: handleRecvPublishKeys (dynamic_cast(ni)) ; break ; + default: + std::cerr << "Unhandled item subtype " << (uint32_t) ni->PacketSubType() << " in RsGxsNetService: " << std::endl; break; + } + delete item ; + } else { std::cerr << "Not a RsNxsItem, deleting!" << std::endl; @@ -898,57 +918,60 @@ void RsGxsNetService::recvNxsItemQueue(){ bool RsGxsNetService::handleTransaction(RsNxsItem* item) { - - /*! - * This attempts to handle a transaction - * It first checks if this transaction id already exists - * If it does then check this not a initiating transactions - */ - - RS_STACK_MUTEX(mNxsMutex) ; - - const RsPeerId& peer = item->PeerId(); - - RsNxsTransac* transItem = dynamic_cast(item); - - // if this is a RsNxsTransac item process - if(transItem) - return locked_processTransac(transItem); - - - // then this must be transaction content to be consumed - // first check peer exist for transaction - bool peerTransExists = mTransactions.find(peer) != mTransactions.end(); - - // then check transaction exists - - bool transExists = false; - NxsTransaction* tr = NULL; - uint32_t transN = item->transactionNumber; - - if(peerTransExists) - { - TransactionIdMap& transMap = mTransactions[peer]; - - transExists = transMap.find(transN) != transMap.end(); - - if(transExists) - { - #ifdef NXS_NET_DEBUG - std::cerr << "handleTransaction() " << std::endl; - std::cerr << "Consuming Transaction content, transN: " << item->transactionNumber << std::endl; - std::cerr << "Consuming Transaction content, from Peer: " << item->PeerId() << std::endl; + std::cerr << "handleTransaction(RsNxsItem) number=" << item->transactionNumber << std::endl; #endif - tr = transMap[transN]; - tr->mItems.push_back(item); + /*! + * This attempts to handle a transaction + * It first checks if this transaction id already exists + * If it does then check this not a initiating transactions + */ - return true; - } - } + RS_STACK_MUTEX(mNxsMutex) ; - return false; + const RsPeerId& peer = item->PeerId(); + + RsNxsTransac* transItem = dynamic_cast(item); + + // if this is a RsNxsTransac item process + if(transItem) + { +#ifdef NXS_NET_DEBUG + std::cerr << " this is a RsNxsTransac item. callign process." << std::endl; +#endif + return locked_processTransac(transItem); + } + + + // then this must be transaction content to be consumed + // first check peer exist for transaction + bool peerTransExists = mTransactions.find(peer) != mTransactions.end(); + + // then check transaction exists + + NxsTransaction* tr = NULL; + uint32_t transN = item->transactionNumber; + + if(peerTransExists) + { + TransactionIdMap& transMap = mTransactions[peer]; + + if(transMap.find(transN) != transMap.end()) + { +#ifdef NXS_NET_DEBUG + std::cerr << " Consuming Transaction content, transN: " << item->transactionNumber << std::endl; + std::cerr << " Consuming Transaction content, from Peer: " << item->PeerId() << std::endl; +#endif + + tr = transMap[transN]; + tr->mItems.push_back(item); + + return true; + } + } + + return false; } bool RsGxsNetService::locked_processTransac(RsNxsTransac* item) @@ -984,52 +1007,72 @@ bool RsGxsNetService::locked_processTransac(RsNxsTransac* item) #ifdef NXS_NET_DEBUG std::cerr << "locked_processTransac() " << std::endl; - std::cerr << "locked_processTransac(), Received transaction item: " << transN << std::endl; - std::cerr << "locked_processTransac(), With peer: " << item->PeerId() << std::endl; - std::cerr << "locked_processTransac(), trans type: " << item->transactFlag << std::endl; + std::cerr << " Received transaction item: " << transN << std::endl; + std::cerr << " With peer: " << item->PeerId() << std::endl; + std::cerr << " trans type: " << item->transactFlag << std::endl; #endif bool peerTrExists = mTransactions.find(peer) != mTransactions.end(); bool transExists = false; - if(peerTrExists){ - + if(peerTrExists) + { TransactionIdMap& transMap = mTransactions[peer]; // record whether transaction exists already transExists = transMap.find(transN) != transMap.end(); - } // initiating an incoming transaction - if(item->transactFlag & RsNxsTransac::FLAG_BEGIN_P1){ + if(item->transactFlag & RsNxsTransac::FLAG_BEGIN_P1) + { +#ifdef NXS_NET_DEBUG + std::cerr << " initiating Incoming transaction." << std::endl; +#endif - if(transExists) - return false; // should not happen! + if(transExists) + { +#ifdef NXS_NET_DEBUG + std::cerr << " transaction already exist! ERROR" << std::endl; +#endif + return false; // should not happen! + } - // create a transaction if the peer does not exist - if(!peerTrExists){ - mTransactions[peer] = TransactionIdMap(); - } + // create a transaction if the peer does not exist + if(!peerTrExists) + mTransactions[peer] = TransactionIdMap(); - TransactionIdMap& transMap = mTransactions[peer]; + TransactionIdMap& transMap = mTransactions[peer]; - // create new transaction - tr = new NxsTransaction(); - transMap[transN] = tr; - tr->mTransaction = item; + // create new transaction + tr = new NxsTransaction(); + transMap[transN] = tr; + tr->mTransaction = item; tr->mTimeOut = item->timestamp + mTransactionTimeOut; +#ifdef NXS_NET_DEBUG + std::cerr << " Setting timeout of " << mTransactionTimeOut << " secs, which is " << tr->mTimeOut - time(NULL) << " secs from now." << std::endl; +#endif - // note state as receiving, commencement item - // is sent on next run() loop - tr->mFlag = NxsTransaction::FLAG_STATE_STARTING; + // note state as receiving, commencement item + // is sent on next run() loop + tr->mFlag = NxsTransaction::FLAG_STATE_STARTING; return true; - // commencement item for outgoing transaction - }else if(item->transactFlag & RsNxsTransac::FLAG_BEGIN_P2){ + // commencement item for outgoing transaction + } + else if(item->transactFlag & RsNxsTransac::FLAG_BEGIN_P2) + { +#ifdef NXS_NET_DEBUG + std::cerr << " initiating outgoign transaction." << std::endl; +#endif + // transaction must exist + if(!peerTrExists || !transExists) + { +#ifdef NXS_NET_DEBUG + std::cerr << " transaction does not exist. Cancelling!" << std::endl; +#endif - // transaction must exist - if(!peerTrExists || !transExists) - return false; + return false; + } // alter state so transaction content is sent on @@ -1040,12 +1083,21 @@ bool RsGxsNetService::locked_processTransac(RsNxsTransac* item) delete item; return true; // end transac item for outgoing transaction - }else if(item->transactFlag & RsNxsTransac::FLAG_END_SUCCESS){ + } + else if(item->transactFlag & RsNxsTransac::FLAG_END_SUCCESS) + { - // transaction does not exist - if(!peerTrExists || !transExists){ - return false; - } +#ifdef NXS_NET_DEBUG + std::cerr << " marking this transaction succeed" << std::endl; +#endif + // transaction does not exist + if(!peerTrExists || !transExists) + { +#ifdef NXS_NET_DEBUG + std::cerr << " transaction does not exist. Cancelling!" << std::endl; +#endif + return false; + } // alter state so that transaction is removed // on next run() loop @@ -1150,191 +1202,236 @@ bool RsGxsNetService::locked_checkTransacTimedOut(NxsTransaction* tr) return tr->mTimeOut < ((uint32_t) time(NULL)); } -void RsGxsNetService::processTransactions(){ +void RsGxsNetService::processTransactions() +{ +#ifdef NXS_NET_DEBUG + if(!mTransactions.empty()) + std::cerr << "processTransactions()" << std::endl; +#endif + RS_STACK_MUTEX(mNxsMutex) ; - RS_STACK_MUTEX(mNxsMutex) ; + TransactionsPeerMap::iterator mit = mTransactions.begin(); - TransactionsPeerMap::iterator mit = mTransactions.begin(); - - for(; mit != mTransactions.end(); ++mit){ - - TransactionIdMap& transMap = mit->second; - TransactionIdMap::iterator mmit = transMap.begin(), - - mmit_end = transMap.end(); - - // transaction to be removed - std::list toRemove; - - /*! - * Transactions owned by peer - */ - if(mit->first == mOwnId){ - - for(; mmit != mmit_end; ++mmit){ - - NxsTransaction* tr = mmit->second; - uint16_t flag = tr->mFlag; - std::list::iterator lit, lit_end; - uint32_t transN = tr->mTransaction->transactionNumber; - - // first check transaction has not expired - if(locked_checkTransacTimedOut(tr)) - { - std::cerr << std::dec ; - int total_transaction_time = (int)time(NULL) - (tr->mTimeOut - mTransactionTimeOut) ; - std::cerr << "Outgoing Transaction has failed, tranN: " << transN << ", Peer: " << mit->first ; - std::cerr << ", age: " << total_transaction_time << ", nItems=" << tr->mTransaction->nItems << ". tr->mTimeOut = " << tr->mTimeOut << ", now = " << (uint32_t) time(NULL) << std::endl; - - tr->mFlag = NxsTransaction::FLAG_STATE_FAILED; - toRemove.push_back(transN); - mComplTransactions.push_back(tr); - continue; - } - - // send items requested - if(flag & NxsTransaction::FLAG_STATE_SENDING){ + for(; mit != mTransactions.end(); ++mit) + { + TransactionIdMap& transMap = mit->second; + TransactionIdMap::iterator mmit = transMap.begin(), mmit_end = transMap.end(); #ifdef NXS_NET_DEBUG - std::cerr << "processTransactions() " << std::endl; - std::cerr << "Sending Transaction content, transN: " << transN << std::endl; - std::cerr << "with peer: " << tr->mTransaction->PeerId(); + if(mmit != mmit_end) + std::cerr << " peerId=" << mit->first << std::endl; #endif - lit = tr->mItems.begin(); - lit_end = tr->mItems.end(); + // transaction to be removed + std::list toRemove; - for(; lit != lit_end; ++lit){ - sendItem(*lit); - } + /*! + * Transactions owned by peer + */ + if(mit->first == mOwnId) + { + for(; mmit != mmit_end; ++mmit) + { +#ifdef NXS_NET_DEBUG + std::cerr << " type: outgoing " << std::endl; + std::cerr << " transN = " << mmit->second->mTransaction->transactionNumber << std::endl; +#endif + NxsTransaction* tr = mmit->second; + uint16_t flag = tr->mFlag; + std::list::iterator lit, lit_end; + uint32_t transN = tr->mTransaction->transactionNumber; - tr->mItems.clear(); // clear so they don't get deleted in trans cleaning - tr->mFlag = NxsTransaction::FLAG_STATE_WAITING_CONFIRM; + // first check transaction has not expired + if(locked_checkTransacTimedOut(tr)) + { +#ifdef NXS_NET_DEBUG + std::cerr << " timeout! " << std::endl; + std::cerr << std::dec ; + int total_transaction_time = (int)time(NULL) - (tr->mTimeOut - mTransactionTimeOut) ; + std::cerr << " Outgoing Transaction has failed, tranN: " << transN << ", Peer: " << mit->first ; + std::cerr << ", age: " << total_transaction_time << ", nItems=" << tr->mTransaction->nItems << ". tr->mTimeOut = " << tr->mTimeOut << ", now = " << (uint32_t) time(NULL) << std::endl; +#endif - }else if(flag & NxsTransaction::FLAG_STATE_WAITING_CONFIRM){ - continue; + tr->mFlag = NxsTransaction::FLAG_STATE_FAILED; + toRemove.push_back(transN); + mComplTransactions.push_back(tr); + continue; + } +#ifdef NXS_NET_DEBUG + else + std::cerr << " still on time." << std::endl; +#endif - }else if(flag & NxsTransaction::FLAG_STATE_COMPLETED){ + // send items requested + if(flag & NxsTransaction::FLAG_STATE_SENDING) + { +#ifdef NXS_NET_DEBUG + std::cerr << " Sending Transaction content, transN: " << transN << " with peer: " << tr->mTransaction->PeerId() << std::endl; +#endif + lit = tr->mItems.begin(); + lit_end = tr->mItems.end(); + + for(; lit != lit_end; ++lit){ + sendItem(*lit); + } + + tr->mItems.clear(); // clear so they don't get deleted in trans cleaning + tr->mFlag = NxsTransaction::FLAG_STATE_WAITING_CONFIRM; + + } + else if(flag & NxsTransaction::FLAG_STATE_WAITING_CONFIRM) + { +#ifdef NXS_NET_DEBUG + std::cerr << " Waiting confirm! returning." << std::endl; +#endif + continue; + + } + else if(flag & NxsTransaction::FLAG_STATE_COMPLETED) + { #ifdef NXS_NET_DEBUG int total_transaction_time = (int)time(NULL) - (tr->mTimeOut - mTransactionTimeOut) ; - std::cerr << "RsGxsNetService::processTransactions() outgoing completed " << tr->mTransaction->nItems << " items transaction in " << total_transaction_time << " seconds." << std::endl; + std::cerr << " Outgoing completed " << tr->mTransaction->nItems << " items transaction in " << total_transaction_time << " seconds." << std::endl; #endif // move to completed transactions - toRemove.push_back(transN); - mComplTransactions.push_back(tr); - }else{ + toRemove.push_back(transN); + mComplTransactions.push_back(tr); + }else{ #ifdef NXS_NET_DEBUG - std::cerr << "processTransactions() " << std::endl; - std::cerr << "processTransactions(), Unknown flag for active transaction, transN: " << transN - << std::endl; - std::cerr << "processTransactions(), Unknown flag, Peer: " << mit->first; + std::cerr << " Unknown flag for active transaction, transN: " << transN << ", Peer: " << mit->first<< std::endl; #endif - toRemove.push_back(transN); - tr->mFlag = NxsTransaction::FLAG_STATE_FAILED; - mComplTransactions.push_back(tr); - } - } + toRemove.push_back(transN); + tr->mFlag = NxsTransaction::FLAG_STATE_FAILED; + mComplTransactions.push_back(tr); + } + } - }else{ + }else{ - /*! - * Essentially these are incoming transactions - * Several states are dealth with - * Receiving: waiting to receive items from peer's transaction - * and checking if all have been received - * Completed: remove transaction from active and tell peer - * involved in transaction - * Starting: this is a new transaction and need to teell peer - * involved in transaction - */ + /*! + * Essentially these are incoming transactions + * Several states are dealth with + * Receiving: waiting to receive items from peer's transaction + * and checking if all have been received + * Completed: remove transaction from active and tell peer + * involved in transaction + * Starting: this is a new transaction and need to teell peer + * involved in transaction + */ - for(; mmit != mmit_end; ++mmit){ + for(; mmit != mmit_end; ++mmit){ - NxsTransaction* tr = mmit->second; - uint16_t flag = tr->mFlag; - uint32_t transN = tr->mTransaction->transactionNumber; + NxsTransaction* tr = mmit->second; + uint16_t flag = tr->mFlag; + uint32_t transN = tr->mTransaction->transactionNumber; - // first check transaction has not expired - if(locked_checkTransacTimedOut(tr)) - { - std::cerr << std::dec ; - int total_transaction_time = (int)time(NULL) - (tr->mTimeOut - mTransactionTimeOut) ; - std::cerr << "Incoming Transaction has failed, tranN: " << transN << ", Peer: " << mit->first ; - std::cerr << ", age: " << total_transaction_time << ", nItems=" << tr->mTransaction->nItems << ". tr->mTimeOut = " << tr->mTimeOut << ", now = " << (uint32_t) time(NULL) << std::endl; +#ifdef NXS_NET_DEBUG + std::cerr << " type: incoming " << std::endl; + std::cerr << " transN = " << mmit->second->mTransaction->transactionNumber << std::endl; +#endif + // first check transaction has not expired + if(locked_checkTransacTimedOut(tr)) + { +#ifdef NXS_NET_DEBUG + std::cerr << " timeout!" << std::endl; + std::cerr << std::dec ; + int total_transaction_time = (int)time(NULL) - (tr->mTimeOut - mTransactionTimeOut) ; + std::cerr << " Incoming Transaction has failed, tranN: " << transN << ", Peer: " << mit->first ; + std::cerr << ", age: " << total_transaction_time << ", nItems=" << tr->mTransaction->nItems << ". tr->mTimeOut = " << tr->mTimeOut << ", now = " << (uint32_t) time(NULL) << std::endl; +#endif - tr->mFlag = NxsTransaction::FLAG_STATE_FAILED; - toRemove.push_back(transN); - mComplTransactions.push_back(tr); - continue; - } + tr->mFlag = NxsTransaction::FLAG_STATE_FAILED; + toRemove.push_back(transN); + mComplTransactions.push_back(tr); + continue; + } - if(flag & NxsTransaction::FLAG_STATE_RECEIVING){ + if(flag & NxsTransaction::FLAG_STATE_RECEIVING) + { +#ifdef NXS_NET_DEBUG + std::cerr << " received " << tr->mItems.size() << " item over a total of " << tr->mTransaction->nItems << std::endl; +#endif - // if the number it item received equal that indicated - // then transaction is marked as completed - // to be moved to complete transations - // check if done + // if the number it item received equal that indicated + // then transaction is marked as completed + // to be moved to complete transations + // check if done if(tr->mItems.size() == tr->mTransaction->nItems) - tr->mFlag = NxsTransaction::FLAG_STATE_COMPLETED; - - }else if(flag & NxsTransaction::FLAG_STATE_COMPLETED) - { - - // send completion msg - RsNxsTransac* trans = new RsNxsTransac(mServType); - trans->clear(); - trans->transactFlag = RsNxsTransac::FLAG_END_SUCCESS; - trans->transactionNumber = transN; - trans->PeerId(tr->mTransaction->PeerId()); - sendItem(trans); - - // move to completed transactions - mComplTransactions.push_back(tr); + { + tr->mFlag = NxsTransaction::FLAG_STATE_COMPLETED; #ifdef NXS_NET_DEBUG - int total_transaction_time = (int)time(NULL) - (tr->mTimeOut - mTransactionTimeOut) ; - std::cerr << "RsGxsNetService::processTransactions() incoming completed " << tr->mTransaction->nItems << " items transaction in " << total_transaction_time << " seconds." << std::endl; + std::cerr << " completed!" << std::endl; +#endif + } + + }else if(flag & NxsTransaction::FLAG_STATE_COMPLETED) + { +#ifdef NXS_NET_DEBUG + std::cerr << " transaction is completed!" << std::endl; + std::cerr << " sending success!" << std::endl; #endif - // transaction processing done - // for this id, add to removal list - toRemove.push_back(mmit->first); - }else if(flag & NxsTransaction::FLAG_STATE_STARTING){ + // send completion msg + RsNxsTransac* trans = new RsNxsTransac(mServType); + trans->clear(); + trans->transactFlag = RsNxsTransac::FLAG_END_SUCCESS; + trans->transactionNumber = transN; + trans->PeerId(tr->mTransaction->PeerId()); + sendItem(trans); - // send item to tell peer your are ready to start - RsNxsTransac* trans = new RsNxsTransac(mServType); - trans->clear(); - trans->transactFlag = RsNxsTransac::FLAG_BEGIN_P2 | - (tr->mTransaction->transactFlag & RsNxsTransac::FLAG_TYPE_MASK); - trans->transactionNumber = transN; - trans->PeerId(tr->mTransaction->PeerId()); - sendItem(trans); - tr->mFlag = NxsTransaction::FLAG_STATE_RECEIVING; + // move to completed transactions + mComplTransactions.push_back(tr); +#ifdef NXS_NET_DEBUG + int total_transaction_time = (int)time(NULL) - (tr->mTimeOut - mTransactionTimeOut) ; + std::cerr << " incoming completed " << tr->mTransaction->nItems << " items transaction in " << total_transaction_time << " seconds." << std::endl; +#endif - } - else{ + // transaction processing done + // for this id, add to removal list + toRemove.push_back(mmit->first); + } + else if(flag & NxsTransaction::FLAG_STATE_STARTING) + { +#ifdef NXS_NET_DEBUG + std::cerr << " transaction is starting!" << std::endl; + std::cerr << " setting state to Receiving" << std::endl; +#endif + // send item to tell peer your are ready to start + RsNxsTransac* trans = new RsNxsTransac(mServType); + trans->clear(); + trans->transactFlag = RsNxsTransac::FLAG_BEGIN_P2 | + (tr->mTransaction->transactFlag & RsNxsTransac::FLAG_TYPE_MASK); + trans->transactionNumber = transN; + trans->PeerId(tr->mTransaction->PeerId()); + sendItem(trans); + tr->mFlag = NxsTransaction::FLAG_STATE_RECEIVING; - std::cerr << "processTransactions() " << std::endl; - std::cerr << "processTransactions(), Unknown flag for active transaction, transN: " << transN - << std::endl; - std::cerr << "processTransactions(), Unknown flag, Peer: " << mit->first; - toRemove.push_back(mmit->first); - mComplTransactions.push_back(tr); - tr->mFlag = NxsTransaction::FLAG_STATE_FAILED; // flag as a failed transaction - } - } - } + } + else{ +#ifdef NXS_NET_DEBUG + std::cerr << " transaction is in unknown state. ERROR!" << std::endl; + std::cerr << " transaction FAILS!" << std::endl; +#endif - std::list::iterator lit = toRemove.begin(); + std::cerr << " Unknown flag for active transaction, transN: " << transN << ", Peer: " << mit->first << std::endl; + toRemove.push_back(mmit->first); + mComplTransactions.push_back(tr); + tr->mFlag = NxsTransaction::FLAG_STATE_FAILED; // flag as a failed transaction + } + } + } - for(; lit != toRemove.end(); ++lit) - { - transMap.erase(*lit); - } + std::list::iterator lit = toRemove.begin(); - } + for(; lit != toRemove.end(); ++lit) + { + transMap.erase(*lit); + } + + } } int RsGxsNetService::getGroupPopularity(const RsGxsGroupId& gid) @@ -1356,8 +1453,6 @@ void RsGxsNetService::processCompletedTransactions() * Depending on transaction we may have to respond to peer * responsible for transaction */ - std::list::iterator lit = mComplTransactions.begin(); - while(mComplTransactions.size()>0) { @@ -1379,141 +1474,185 @@ void RsGxsNetService::processCompletedTransactions() void RsGxsNetService::locked_processCompletedIncomingTrans(NxsTransaction* tr) { - uint16_t flag = tr->mTransaction->transactFlag; - if(tr->mFlag & NxsTransaction::FLAG_STATE_COMPLETED){ - // for a completed list response transaction - // one needs generate requests from this - if(flag & RsNxsTransac::FLAG_TYPE_MSG_LIST_RESP) - { - // generate request based on a peers response - locked_genReqMsgTransaction(tr); +#ifdef NXS_NET_DEBUG + std::cerr << "Processing complete Incoming transaction with " << tr->mTransaction->nItems << " items." << std::endl; + std::cerr << " flags = " << flag << std::endl; + std::cerr << " peerId= " << tr->mTransaction->PeerId() << std::endl; +#endif + if(tr->mFlag & NxsTransaction::FLAG_STATE_COMPLETED) + { +#ifdef NXS_NET_DEBUG + std::cerr << " transaction has completed." << std::endl; +#endif + // for a completed list response transaction + // one needs generate requests from this + if(flag & RsNxsTransac::FLAG_TYPE_MSG_LIST_RESP) + { +#ifdef NXS_NET_DEBUG + std::cerr << " type = msg list response." << std::endl; + std::cerr << " => generate msg request based on it." << std::endl; +#endif + // generate request based on a peers response + locked_genReqMsgTransaction(tr); - }else if(flag & RsNxsTransac::FLAG_TYPE_GRP_LIST_RESP) - { - locked_genReqGrpTransaction(tr); - } - // you've finished receiving request information now gen - else if(flag & RsNxsTransac::FLAG_TYPE_MSG_LIST_REQ) - { - locked_genSendMsgsTransaction(tr); - } - else if(flag & RsNxsTransac::FLAG_TYPE_GRP_LIST_REQ) - { - locked_genSendGrpsTransaction(tr); - } - else if(flag & RsNxsTransac::FLAG_TYPE_GRPS) - { + }else if(flag & RsNxsTransac::FLAG_TYPE_GRP_LIST_RESP) + { +#ifdef NXS_NET_DEBUG + std::cerr << " type = grp list response." << std::endl; + std::cerr << " => generate group transaction request based on it." << std::endl; +#endif + locked_genReqGrpTransaction(tr); + } + // you've finished receiving request information now gen + else if(flag & RsNxsTransac::FLAG_TYPE_MSG_LIST_REQ) + { +#ifdef NXS_NET_DEBUG + std::cerr << " type = msg list request." << std::endl; + std::cerr << " => generate msg list based on it." << std::endl; +#endif + locked_genSendMsgsTransaction(tr); + } + else if(flag & RsNxsTransac::FLAG_TYPE_GRP_LIST_REQ) + { +#ifdef NXS_NET_DEBUG + std::cerr << " type = grp list request." << std::endl; + std::cerr << " => generate grp list based on it." << std::endl; +#endif + locked_genSendGrpsTransaction(tr); + } + else if(flag & RsNxsTransac::FLAG_TYPE_GRPS) + { +#ifdef NXS_NET_DEBUG + std::cerr << " type = groups." << std::endl; +#endif + std::vector grps; - std::list::iterator lit = tr->mItems.begin(); - std::vector grps; + while(tr->mItems.size() != 0) + { + RsNxsGrp* grp = dynamic_cast(tr->mItems.front()); - while(tr->mItems.size() != 0) - { - RsNxsGrp* grp = dynamic_cast(tr->mItems.front()); + if(grp) + { + tr->mItems.pop_front(); + grps.push_back(grp); +#ifdef NXS_NET_DEBUG + std::cerr << " pushing new group " << grp->grpId << " to list." << std::endl; +#endif + } + else + { +#ifdef NXS_NET_DEBUG + std::cerr << " /!\\ item did not caste to grp" << std::endl; +#endif + } + } - if(grp) - { - tr->mItems.pop_front(); - grps.push_back(grp); +#ifdef NXS_NET_DEBUG + std::cerr << " notifying observer " << std::endl; +#endif + // notify listener of grps + mObserver->notifyNewGroups(grps); - } - else - { - #ifdef NXS_NET_DEBUG - std::cerr << "RsGxsNetService::processCompletedTransactions(): item did not caste to grp" - << std::endl; - #endif - } - } + // now note this as the latest you've received from this peer + RsPeerId peerFrom = tr->mTransaction->PeerId(); + uint32_t updateTS = tr->mTransaction->updateTS; - // notify listener of grps - mObserver->notifyNewGroups(grps); + ClientGrpMap::iterator it = mClientGrpUpdateMap.find(peerFrom); - // now note this as the latest you've received from this peer - RsPeerId peerFrom = tr->mTransaction->PeerId(); - uint32_t updateTS = tr->mTransaction->updateTS; + RsGxsGrpUpdateItem* item = NULL; - ClientGrpMap::iterator it = mClientGrpUpdateMap.find(peerFrom); + if(it != mClientGrpUpdateMap.end()) + { + item = it->second; + }else + { + item = new RsGxsGrpUpdateItem(mServType); + mClientGrpUpdateMap.insert( + std::make_pair(peerFrom, item)); + } - RsGxsGrpUpdateItem* item = NULL; + item->grpUpdateTS = updateTS; + item->peerId = peerFrom; - if(it != mClientGrpUpdateMap.end()) - { - item = it->second; - }else - { - item = new RsGxsGrpUpdateItem(mServType); - mClientGrpUpdateMap.insert( - std::make_pair(peerFrom, item)); - } - - item->grpUpdateTS = updateTS; - item->peerId = peerFrom; - - IndicateConfigChanged(); + IndicateConfigChanged(); - }else if(flag & RsNxsTransac::FLAG_TYPE_MSGS) - { + }else if(flag & RsNxsTransac::FLAG_TYPE_MSGS) + { - std::vector msgs; + std::vector msgs; +#ifdef NXS_NET_DEBUG + std::cerr << " type = msgs." << std::endl; +#endif + RsGxsGroupId grpId; + while(tr->mItems.size() > 0) + { + RsNxsMsg* msg = dynamic_cast(tr->mItems.front()); + if(msg) + { + if(grpId.isNull()) + grpId = msg->grpId; - RsGxsGroupId grpId; - while(tr->mItems.size() > 0) - { - RsNxsMsg* msg = dynamic_cast(tr->mItems.front()); - if(msg) - { - if(grpId.isNull()) - grpId = msg->grpId; - - tr->mItems.pop_front(); - msgs.push_back(msg); - } - else - { - #ifdef NXS_NET_DEBUG - std::cerr << "RsGxsNetService::processCompletedTransactions(): item did not caste to msg" - << std::endl; - #endif - } - } + tr->mItems.pop_front(); + msgs.push_back(msg); +#ifdef NXS_NET_DEBUG + std::cerr << " pushing grpId="<< msg->grpId << ", msgsId=" << msg->msgId << std::endl; +#endif + } + else + { +#ifdef NXS_NET_DEBUG + std::cerr << "RsGxsNetService::processCompletedTransactions(): item did not caste to msg" + << std::endl; +#endif + } + } #ifdef NSXS_FRAG - std::map collatedMsgs; - collateMsgFragments(msgs, collatedMsgs); + std::map collatedMsgs; + collateMsgFragments(msgs, collatedMsgs); - msgs.clear(); + msgs.clear(); - std::map::iterator mit = collatedMsgs.begin(); - for(; mit != collatedMsgs.end(); ++mit) - { - MsgFragments& f = mit->second; - RsNxsMsg* msg = deFragmentMsg(f); + std::map::iterator mit = collatedMsgs.begin(); + for(; mit != collatedMsgs.end(); ++mit) + { + MsgFragments& f = mit->second; + RsNxsMsg* msg = deFragmentMsg(f); - if(msg) - msgs.push_back(msg); - } + if(msg) + msgs.push_back(msg); + } #endif - // notify listener of msgs - mObserver->notifyNewMessages(msgs); +#ifdef NXS_NET_DEBUG + std::cerr << " notifying observer of " << msgs.size() << " new messages." << std::endl; +#endif + // notify listener of msgs + mObserver->notifyNewMessages(msgs); - // now note that this is the latest you've received from this peer - // for the grp id - locked_doMsgUpdateWork(tr->mTransaction, grpId); + // now note that this is the latest you've received from this peer + // for the grp id + locked_doMsgUpdateWork(tr->mTransaction, grpId); - } - }else if(tr->mFlag == NxsTransaction::FLAG_STATE_FAILED){ - // don't do anything transaction will simply be cleaned - } + } + } + else if(tr->mFlag == NxsTransaction::FLAG_STATE_FAILED) + { +#ifdef NXS_NET_DEBUG + std::cerr << " transaction has failed. Wasting it." << std::endl; +#endif + // don't do anything transaction will simply be cleaned + } return; } void RsGxsNetService::locked_doMsgUpdateWork(const RsNxsTransac *nxsTrans, const RsGxsGroupId &grpId) { - +#ifdef NXS_NET_DEBUG + std::cerr << "updating MsgUpdate time stamps for peerId=" << nxsTrans->PeerId() << ", grpId=" << grpId << std::endl; +#endif // firts check if peer exists const RsPeerId& peerFrom = nxsTrans->PeerId(); @@ -1528,36 +1667,52 @@ void RsGxsNetService::locked_doMsgUpdateWork(const RsNxsTransac *nxsTrans, const } else { +#ifdef NXS_NET_DEBUG + std::cerr << " created new entry." << std::endl; +#endif mui = new RsGxsMsgUpdateItem(mServType); mClientMsgUpdateMap.insert(std::make_pair(peerFrom, mui)); } - mui->msgUpdateTS[grpId] = nxsTrans->updateTS; mui->peerId = peerFrom; + if(mPartialMsgUpdates[peerFrom].find(grpId) != mPartialMsgUpdates[peerFrom].end()) + { +#ifdef NXS_NET_DEBUG + std::cerr << " this is a partial update. Not using new time stamp." << std::endl; +#endif + } + else + { +#ifdef NXS_NET_DEBUG + std::cerr << " this is a full update. Updating time stamp." << std::endl; +#endif + mui->msgUpdateTS[grpId] = nxsTrans->updateTS; IndicateConfigChanged(); + } } void RsGxsNetService::locked_processCompletedOutgoingTrans(NxsTransaction* tr) { uint16_t flag = tr->mTransaction->transactFlag; - if(tr->mFlag & NxsTransaction::FLAG_STATE_COMPLETED){ +#ifdef NXS_NET_DEBUG + std::cerr << "locked_processCompletedOutgoingTrans(): tr->flags = " << flag << std::endl; +#endif + + if(tr->mFlag & NxsTransaction::FLAG_STATE_COMPLETED) + { // for a completed list response transaction // one needs generate requests from this if(flag & RsNxsTransac::FLAG_TYPE_MSG_LIST_RESP) { #ifdef NXS_NET_DEBUG - std::cerr << "processCompletedOutgoingTrans()" << std::endl; - std::cerr << "complete Sending Msg List Response, transN: " << - tr->mTransaction->transactionNumber << std::endl; + std::cerr << " complete Sending Msg List Response, transN: " << tr->mTransaction->transactionNumber << std::endl; #endif }else if(flag & RsNxsTransac::FLAG_TYPE_GRP_LIST_RESP) { #ifdef NXS_NET_DEBUG - std::cerr << "processCompletedOutgoingTrans()" << std::endl; - std::cerr << "complete Sending Grp Response, transN: " << - tr->mTransaction->transactionNumber << std::endl; + std::cerr << " complete Sending Grp Response, transN: " << tr->mTransaction->transactionNumber << std::endl; #endif } // you've finished sending a request so don't do anything @@ -1565,223 +1720,283 @@ void RsGxsNetService::locked_processCompletedOutgoingTrans(NxsTransaction* tr) (flag & RsNxsTransac::FLAG_TYPE_GRP_LIST_REQ) ) { #ifdef NXS_NET_DEBUG - std::cerr << "processCompletedOutgoingTrans()" << std::endl; - std::cerr << "complete Sending Msg/Grp Request, transN: " << - tr->mTransaction->transactionNumber << std::endl; + std::cerr << " complete Sending Msg/Grp Request, transN: " << tr->mTransaction->transactionNumber << std::endl; #endif }else if(flag & RsNxsTransac::FLAG_TYPE_GRPS) { #ifdef NXS_NET_DEBUG - std::cerr << "processCompletedOutgoingTrans()" << std::endl; - std::cerr << "complete Sending Grp Data, transN: " << - tr->mTransaction->transactionNumber << std::endl; + std::cerr << " complete Sending Grp Data, transN: " << tr->mTransaction->transactionNumber << std::endl; #endif }else if(flag & RsNxsTransac::FLAG_TYPE_MSGS) { #ifdef NXS_NET_DEBUG - std::cerr << "processCompletedOutgoingTrans()" << std::endl; - std::cerr << "complete Sending Msg Data, transN: " << - tr->mTransaction->transactionNumber << std::endl; + std::cerr << " complete Sending Msg Data, transN: " << tr->mTransaction->transactionNumber << std::endl; #endif } }else if(tr->mFlag == NxsTransaction::FLAG_STATE_FAILED){ #ifdef NXS_NET_DEBUG - std::cerr << "processCompletedOutgoingTrans()" << std::endl; - std::cerr << "Failed transaction! transN: " << - tr->mTransaction->transactionNumber << std::endl; + std::cerr << " Failed transaction! transN: " << tr->mTransaction->transactionNumber << std::endl; #endif }else{ #ifdef NXS_NET_DEBUG - std::cerr << "processCompletedOutgoingTrans()" << std::endl; - std::cerr << "Serious error unrecognised trans Flag! transN: " << - tr->mTransaction->transactionNumber << std::endl; + std::cerr << " Serious error unrecognised trans Flag! transN: " << tr->mTransaction->transactionNumber << std::endl; #endif } } -void RsGxsNetService::locked_pushMsgTransactionFromList( - std::list& reqList, const RsPeerId& peerId, const uint32_t& transN) +void RsGxsNetService::locked_pushMsgTransactionFromList(std::list& reqList, const RsPeerId& peerId, const uint32_t& transN) { - RsNxsTransac* transac = new RsNxsTransac(mServType); - transac->transactFlag = RsNxsTransac::FLAG_TYPE_MSG_LIST_REQ - | RsNxsTransac::FLAG_BEGIN_P1; - transac->timestamp = 0; - transac->nItems = reqList.size(); - transac->PeerId(peerId); - transac->transactionNumber = transN; - NxsTransaction* newTrans = new NxsTransaction(); - newTrans->mItems = reqList; - newTrans->mFlag = NxsTransaction::FLAG_STATE_WAITING_CONFIRM; +#ifdef NXS_NET_DEBUG + std::cerr << "locked_pushMsgTransactionFromList()" << std::endl; + std::cerr << " nelems = " << reqList.size() << std::endl; + std::cerr << " peerId = " << peerId << std::endl; + std::cerr << " transN = " << transN << std::endl; +#endif + RsNxsTransac* transac = new RsNxsTransac(mServType); + transac->transactFlag = RsNxsTransac::FLAG_TYPE_MSG_LIST_REQ + | RsNxsTransac::FLAG_BEGIN_P1; + transac->timestamp = 0; + transac->nItems = reqList.size(); + transac->PeerId(peerId); + transac->transactionNumber = transN; + NxsTransaction* newTrans = new NxsTransaction(); + newTrans->mItems = reqList; + newTrans->mFlag = NxsTransaction::FLAG_STATE_WAITING_CONFIRM; newTrans->mTimeOut = time(NULL) + mTransactionTimeOut; - // create transaction copy with your id to indicate - // its an outgoing transaction - newTrans->mTransaction = new RsNxsTransac(*transac); - newTrans->mTransaction->PeerId(mOwnId); - sendItem(transac); - { - if (!locked_addTransaction(newTrans)) - delete newTrans; - } + // create transaction copy with your id to indicate + // its an outgoing transaction + newTrans->mTransaction = new RsNxsTransac(*transac); + newTrans->mTransaction->PeerId(mOwnId); + sendItem(transac); + + if (!locked_addTransaction(newTrans)) + delete newTrans; + + std::cerr << " Requested new transaction for " << reqList.size() << " items." << std::endl; } void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr) { - // to create a transaction you need to know who you are transacting with - // then what msgs to request - // then add an active Transaction for request - - std::list msgItemL; - - std::list::iterator lit = tr->mItems.begin(); - - // first get item list sent from transaction - for(; lit != tr->mItems.end(); ++lit) - { - RsNxsSyncMsgItem* item = dynamic_cast(*lit); - if(item) - { - msgItemL.push_back(item); - }else - { #ifdef NXS_NET_DEBUG - std::cerr << "RsGxsNetService::genReqMsgTransaction(): item failed cast to RsNxsSyncMsgItem* " - << std::endl; + std::cerr << "RsGxsNetService::genReqMsgTransaction()" << std::endl; #endif - } - } - if(msgItemL.empty()) - return; + // to create a transaction you need to know who you are transacting with + // then what msgs to request + // then add an active Transaction for request - // get grp id for this transaction - RsNxsSyncMsgItem* item = msgItemL.front(); - const RsGxsGroupId& grpId = item->grpId; + std::list msgItemL; + std::list::iterator lit = tr->mItems.begin(); - std::map grpMetaMap; - grpMetaMap[grpId] = NULL; - mDataStore->retrieveGxsGrpMetaData(grpMetaMap); - RsGxsGrpMetaData* grpMeta = grpMetaMap[grpId]; + // first get item list sent from transaction + for(; lit != tr->mItems.end(); ++lit) + { + RsNxsSyncMsgItem* item = dynamic_cast(*lit); + if(item) + { + msgItemL.push_back(item); + }else + { +#ifdef NXS_NET_DEBUG + std::cerr << "RsGxsNetService::genReqMsgTransaction(): item failed cast to RsNxsSyncMsgItem* " + << std::endl; +#endif + } + } +#ifdef NXS_NET_DEBUG + std::cerr << " found " << msgItemL.size()<< " messages in this transaction." << std::endl; +#endif - int cutoff = 0; - if(grpMeta != NULL) - cutoff = grpMeta->mReputationCutOff; -// -// // you want to find out if you can receive it -// // number polls essentially represent multiple -// // of sleep interval -// if(grpMeta) -// { -// // always can receive, only provides weak guaranttee this peer is part of the group -// bool can = true;//locked_canReceive(grpMeta, tr->mTransaction->PeerId()); -// -// delete grpMeta; -// -// if(!can) -// return; -// -// }else -// { -// return; -// } + if(msgItemL.empty()) + return; + // get grp id for this transaction + RsNxsSyncMsgItem* item = msgItemL.front(); + const RsGxsGroupId& grpId = item->grpId; - GxsMsgReq reqIds; - reqIds[grpId] = std::vector(); - GxsMsgMetaResult result; - mDataStore->retrieveGxsMsgMetaData(reqIds, result); - std::vector &msgMetaV = result[grpId]; +#ifdef NXS_NET_DEBUG + std::cerr << " grpId = " << grpId << std::endl; + std::cerr << " retrieving grp mesta data..." << std::endl; +#endif + std::map grpMetaMap; + grpMetaMap[grpId] = NULL; + mDataStore->retrieveGxsGrpMetaData(grpMetaMap); + RsGxsGrpMetaData* grpMeta = grpMetaMap[grpId]; - std::vector::const_iterator vit = msgMetaV.begin(); - std::set msgIdSet; + int cutoff = 0; + if(grpMeta != NULL) + cutoff = grpMeta->mReputationCutOff; - // put ids in set for each searching - for(; vit != msgMetaV.end(); ++vit) - { - msgIdSet.insert((*vit)->mMsgId); - delete(*vit); - } - msgMetaV.clear(); + GxsMsgReq reqIds; + reqIds[grpId] = std::vector(); + GxsMsgMetaResult result; + mDataStore->retrieveGxsMsgMetaData(reqIds, result); + std::vector &msgMetaV = result[grpId]; - // get unique id for this transaction - uint32_t transN = locked_getTransactionId(); +#ifdef NXS_NET_DEBUG + std::cerr << " retrieving grp message list..." << std::endl; + std::cerr << " grp locally contains " << msgMetaV.size() << " messsages." << std::endl; +#endif + std::vector::const_iterator vit = msgMetaV.begin(); + std::set msgIdSet; - // add msgs that you don't have to request list - std::list::iterator llit = msgItemL.begin(); - std::list reqList; + // put ids in set for each searching + for(; vit != msgMetaV.end(); ++vit) + { + msgIdSet.insert((*vit)->mMsgId); + delete(*vit); + } + msgMetaV.clear(); - const RsPeerId peerFrom = tr->mTransaction->PeerId(); +#ifdef NXS_NET_DEBUG + std::cerr << " grp locally contains " << msgIdSet.size() << " unique messsages." << std::endl; +#endif + // get unique id for this transaction + uint32_t transN = locked_getTransactionId(); - MsgAuthorV toVet; +#ifdef NXS_NET_DEBUG + std::cerr << " new transaction ID: " << transN << std::endl; +#endif + // add msgs that you don't have to request list + std::list::iterator llit = msgItemL.begin(); + std::list reqList; + int reqListSize = 0 ; - std::list peers; - peers.push_back(tr->mTransaction->PeerId()); + const RsPeerId peerFrom = tr->mTransaction->PeerId(); - for(; llit != msgItemL.end(); ++llit) - { - RsNxsSyncMsgItem*& syncItem = *llit; - const RsGxsMessageId& msgId = syncItem->msgId; + MsgAuthorV toVet; - if(msgIdSet.find(msgId) == msgIdSet.end()){ + std::list peers; + peers.push_back(tr->mTransaction->PeerId()); + bool reqListSizeExceeded = false ; - // if reputation is in reputations cache then proceed - // or if there isn't an author (note as author requirement is - // enforced at service level, if no author is needed then reputation - // filtering is optional) - bool noAuthor = syncItem->authorId.isNull(); +#ifdef NXS_NET_DEBUG + std::cerr << " sorting items..." << std::endl; +#endif + for(; llit != msgItemL.end(); ++llit) + { + RsNxsSyncMsgItem*& syncItem = *llit; + const RsGxsMessageId& msgId = syncItem->msgId; - // grp meta must be present if author present - if(!noAuthor && grpMeta == NULL) - continue; +#ifdef NXS_NET_DEBUG + std::cerr << " msg ID = " << msgId ; +#endif + if(reqListSize >= MAX_REQLIST_SIZE) + { +#ifdef NXS_NET_DEBUG + std::cerr << ". reqlist too big. Pruning out this item for now." << std::endl; +#endif + reqListSizeExceeded = true ; + continue ; // we should actually break, but we need to print some debug info. + } - if(mReputations->haveReputation(syncItem->authorId) || noAuthor) - { - GixsReputation rep; + if(reqListSize < MAX_REQLIST_SIZE && msgIdSet.find(msgId) == msgIdSet.end()) + { - if(!noAuthor) - mReputations->getReputation(syncItem->authorId, rep); + // if reputation is in reputations cache then proceed + // or if there isn't an author (note as author requirement is + // enforced at service level, if no author is needed then reputation + // filtering is optional) + bool noAuthor = syncItem->authorId.isNull(); - // if author is required for this message, it will simply get dropped - // at genexchange side of things - if(rep.score > grpMeta->mReputationCutOff || noAuthor) - { - RsNxsSyncMsgItem* msgItem = new RsNxsSyncMsgItem(mServType); - msgItem->grpId = grpId; - msgItem->msgId = msgId; - msgItem->flag = RsNxsSyncMsgItem::FLAG_REQUEST; - msgItem->transactionNumber = transN; - msgItem->PeerId(peerFrom); - reqList.push_back(msgItem); - } - } - else - { - // preload for speed - mReputations->loadReputation(syncItem->authorId, peers); - MsgAuthEntry entry; - entry.mAuthorId = syncItem->authorId; - entry.mGrpId = syncItem->grpId; - entry.mMsgId = syncItem->msgId; - toVet.push_back(entry); - } - } - } +#ifdef NXS_NET_DEBUG + std::cerr << ", reqlist size=" << reqListSize << ", message not present." ; +#endif + // grp meta must be present if author present + if(!noAuthor && grpMeta == NULL) + { + std::cerr << ", no group meta found. Givign up." << std::endl; + continue; + } - if(!toVet.empty()) - { - MsgRespPending* mrp = new MsgRespPending(mReputations, tr->mTransaction->PeerId(), toVet, cutoff); - mPendingResp.push_back(mrp); - } + if(mReputations->haveReputation(syncItem->authorId) || noAuthor) + { + GixsReputation rep; - if(!reqList.empty()) - { - locked_pushMsgTransactionFromList(reqList, tr->mTransaction->PeerId(), transN); - } +#ifdef NXS_NET_DEBUG + std::cerr << ", author Id=" << syncItem->authorId << ". Reputation: " ; +#endif + if(!noAuthor) + mReputations->getReputation(syncItem->authorId, rep); + + // if author is required for this message, it will simply get dropped + // at genexchange side of things + if(rep.score > (int)grpMeta->mReputationCutOff || noAuthor) + { +#ifdef NXS_NET_DEBUG + std::cerr << ", passed! Adding message to req list." << std::endl; +#endif + RsNxsSyncMsgItem* msgItem = new RsNxsSyncMsgItem(mServType); + msgItem->grpId = grpId; + msgItem->msgId = msgId; + msgItem->flag = RsNxsSyncMsgItem::FLAG_REQUEST; + msgItem->transactionNumber = transN; + msgItem->PeerId(peerFrom); + reqList.push_back(msgItem); + ++reqListSize ; + } +#ifdef NXS_NET_DEBUG + else + std::cerr << ", failed!" << std::endl; +#endif + } + else + { +#ifdef NXS_NET_DEBUG + std::cerr << ", no author/no reputation. Pushed to Vetting list." << std::endl; +#endif + // preload for speed + mReputations->loadReputation(syncItem->authorId, peers); + MsgAuthEntry entry; + entry.mAuthorId = syncItem->authorId; + entry.mGrpId = syncItem->grpId; + entry.mMsgId = syncItem->msgId; + toVet.push_back(entry); + } + } +#ifdef NXS_NET_DEBUG + else + std::cerr << ". already here." << std::endl; +#endif + } + + if(!toVet.empty()) + { +#ifdef NXS_NET_DEBUG + std::cerr << " Vetting list: " << toVet.size() << " elements." << std::endl; +#endif + MsgRespPending* mrp = new MsgRespPending(mReputations, tr->mTransaction->PeerId(), toVet, cutoff); + mPendingResp.push_back(mrp); + } + + if(!reqList.empty()) + { +#ifdef NXS_NET_DEBUG + std::cerr << " Request list: " << reqList.size() << " elements." << std::endl; +#endif + locked_pushMsgTransactionFromList(reqList, tr->mTransaction->PeerId(), transN); + + if(reqListSizeExceeded) + { +#ifdef NXS_NET_DEBUG + std::cerr << " Marking update operation as unfinished." << std::endl; +#endif + mPartialMsgUpdates[tr->mTransaction->PeerId()].insert(item->grpId) ; + } + else + { +#ifdef NXS_NET_DEBUG + std::cerr << " Marking update operation as terminal." << std::endl; +#endif + mPartialMsgUpdates[tr->mTransaction->PeerId()].erase(item->grpId) ; + } + } } void RsGxsNetService::locked_pushGrpTransactionFromList( @@ -1828,8 +2043,9 @@ void RsGxsNetService::locked_genReqGrpTransaction(NxsTransaction* tr) // then what grps to request // then add an active Transaction for request - std::list grpItemL; + std::cerr << "locked_genReqGrpTransaction(): " << std::endl; + std::list grpItemL; std::list::iterator lit = tr->mItems.begin(); for(; lit != tr->mItems.end(); ++lit) @@ -1842,7 +2058,7 @@ void RsGxsNetService::locked_genReqGrpTransaction(NxsTransaction* tr) { #ifdef NXS_NET_DEBUG std::cerr << "RsGxsNetService::genReqGrpTransaction(): item failed to caste to RsNxsSyncMsgItem* " - << std::endl; + << std::endl; #endif } } @@ -1872,10 +2088,10 @@ void RsGxsNetService::locked_genReqGrpTransaction(NxsTransaction* tr) { haveItem = true; latestVersion = grpSyncItem->publishTs > metaIter->second->mPublishTs; - } - - if(!haveItem || (haveItem && latestVersion) ){ + } + if(!haveItem || latestVersion) + { // determine if you need to check reputation bool checkRep = !grpSyncItem->authorId.isNull(); @@ -1888,13 +2104,16 @@ void RsGxsNetService::locked_genReqGrpTransaction(NxsTransaction* tr) GixsReputation rep; mReputations->getReputation(grpSyncItem->authorId, rep); - if(rep.score > GIXS_CUT_OFF) - { - addGroupItemToList(tr, grpId, transN, reqList); - } - } + if(rep.score >= GIXS_CUT_OFF) + { + addGroupItemToList(tr, grpId, transN, reqList); + std::cerr << " reputation cut off: limit=" << GIXS_CUT_OFF << " value=" << rep.score << ": allowed." << std::endl; + } + else + std::cerr << " reputation cut off: limit=" << GIXS_CUT_OFF << " value=" << rep.score << ": you shall not pass." << std::endl; + } else - { + { // preload reputation for later mReputations->loadReputation(grpSyncItem->authorId, peers); GrpAuthEntry entry; @@ -1906,7 +2125,7 @@ void RsGxsNetService::locked_genReqGrpTransaction(NxsTransaction* tr) else { addGroupItemToList(tr, grpId, transN, reqList); - } + } } } @@ -1952,7 +2171,8 @@ void RsGxsNetService::locked_genSendGrpsTransaction(NxsTransaction* tr) if (item) { grps[item->grpId] = NULL; - }else + } + else { #ifdef NXS_NET_DEBUG std::cerr << "RsGxsNetService::locked_genSendGrpsTransaction(): item failed to caste to RsNxsSyncGrpItem* " @@ -1965,9 +2185,8 @@ void RsGxsNetService::locked_genSendGrpsTransaction(NxsTransaction* tr) { mDataStore->retrieveNxsGrps(grps, false, false); } - else{ - return; - } + else + return; NxsTransaction* newTr = new NxsTransaction(); newTr->mFlag = NxsTransaction::FLAG_STATE_WAITING_CONFIRM; @@ -2010,7 +2229,7 @@ void RsGxsNetService::locked_genSendGrpsTransaction(NxsTransaction* tr) locked_addTransaction(newTr); - return; + return; } void RsGxsNetService::runVetting() @@ -2224,12 +2443,15 @@ bool RsGxsNetService::locked_addTransaction(NxsTransaction* tr) if(transNumExist){ #ifdef NXS_NET_DEBUG std::cerr << "locked_addTransaction() " << std::endl; - std::cerr << "Transaction number exist already, transN: " << transN - << std::endl; + std::cerr << "Transaction number exist already, transN: " << transN << std::endl; #endif return false; }else{ - transMap[transN] = tr; +#ifdef NXS_NET_DEBUG + std::cerr << "locked_addTransaction() " << std::endl; + std::cerr << "Added transaction number " << transN << std::endl; +#endif + transMap[transN] = tr; return true; } } @@ -2497,12 +2719,12 @@ bool RsGxsNetService::canSendGrpId(const RsPeerId& sslId, RsGxsGrpMetaData& grpM return true; } -bool RsGxsNetService::checkCanRecvMsgFromPeer(const RsPeerId& sslId, - const RsGxsGrpMetaData& grpMeta) { +bool RsGxsNetService::checkCanRecvMsgFromPeer(const RsPeerId& sslId, const RsGxsGrpMetaData& grpMeta) +{ #ifdef NXS_NET_DEBUG std::cerr << "RsGxsNetService::checkCanRecvMsgFromPeer()"; - std::cerr << std::endl; + std::cerr << " peer Id = " << sslId << ", grpId=" << grpMeta.mGroupId <isLoaded(circleId)) { #ifdef NXS_NET_DEBUG - std::cerr << "RsGxsNetService::checkCanRecvMsgFromPeer() EXTERNAL_CIRCLE, checking mCircles->canSend"; + std::cerr << " EXTERNAL_CIRCLE, checking mCircles->canSend"; std::cerr << std::endl; #endif const RsPgpId& pgpId = mPgpUtils->getPGPId(sslId); @@ -2557,7 +2779,7 @@ bool RsGxsNetService::checkCanRecvMsgFromPeer(const RsPeerId& sslId, if(circleType == GXS_CIRCLE_TYPE_YOUREYESONLY) // do not attempt to sync msg unless to originator or those permitted { #ifdef NXS_NET_DEBUG - std::cerr << "RsGxsNetService::checkCanRecvMsgFromPeer() YOUREYESONLY, checking further"; + std::cerr << " YOUREYESONLY, checking further"; std::cerr << std::endl; #endif // a non empty internal circle id means this @@ -2566,18 +2788,18 @@ bool RsGxsNetService::checkCanRecvMsgFromPeer(const RsPeerId& sslId, { const RsGxsCircleId& internalCircleId = grpMeta.mInternalCircle; #ifdef NXS_NET_DEBUG - std::cerr << "RsGxsNetService::canSendGrpId() have mInternalCircle - we are Group creator"; + std::cerr << " have mInternalCircle - we are Group creator"; std::cerr << std::endl; - std::cerr << "RsGxsNetService::canSendGrpId() mCircleId: " << grpMeta.mCircleId; + std::cerr << " mCircleId: " << grpMeta.mCircleId; std::cerr << std::endl; - std::cerr << "RsGxsNetService::canSendGrpId() mInternalCircle: " << grpMeta.mInternalCircle; + std::cerr << " mInternalCircle: " << grpMeta.mInternalCircle; std::cerr << std::endl; #endif if(mCircles->isLoaded(internalCircleId)) { #ifdef NXS_NET_DEBUG - std::cerr << "RsGxsNetService::canSendGrpId() circle Loaded - checking mCircles->canSend"; + std::cerr << " circle Loaded - checking mCircles->canSend"; std::cerr << std::endl; #endif const RsPgpId& pgpId = mPgpUtils->getPGPId(sslId); @@ -2593,13 +2815,13 @@ bool RsGxsNetService::checkCanRecvMsgFromPeer(const RsPeerId& sslId, // an empty internal circle id means this peer can only // send circle related info from peer he received it #ifdef NXS_NET_DEBUG - std::cerr << "RsGxsNetService::canSendGrpId() mInternalCircle not set, someone else's personal circle"; + std::cerr << " mInternalCircle not set, someone else's personal circle"; std::cerr << std::endl; #endif if(grpMeta.mOriginator == sslId) { #ifdef NXS_NET_DEBUG - std::cerr << "RsGxsNetService::canSendGrpId() Originator matches -> can send"; + std::cerr << " Originator matches -> can send"; std::cerr << std::endl; #endif return true; @@ -2607,7 +2829,7 @@ bool RsGxsNetService::checkCanRecvMsgFromPeer(const RsPeerId& sslId, else { #ifdef NXS_NET_DEBUG - std::cerr << "RsGxsNetService::canSendGrpId() Originator doesn't match -> cannot send"; + std::cerr << " Originator doesn't match -> cannot send"; std::cerr << std::endl; #endif return false; @@ -2708,15 +2930,18 @@ void RsGxsNetService::handleRecvSyncMessage(RsNxsSyncMsg* item) void RsGxsNetService::locked_pushMsgRespFromList(std::list& itemL, const RsPeerId& sslId, const uint32_t& transN) { - NxsTransaction* tr = new NxsTransaction(); +#ifdef NXS_NET_DEBUG + std::cerr << "locked_pushMsgResponseFromList()" << std::endl; + std::cerr << " nelems = " << itemL.size() << std::endl; + std::cerr << " peerId = " << sslId << std::endl; + std::cerr << " transN = " << transN << std::endl; +#endif + NxsTransaction* tr = new NxsTransaction(); tr->mItems = itemL; tr->mFlag = NxsTransaction::FLAG_STATE_WAITING_CONFIRM; RsNxsTransac* trItem = new RsNxsTransac(mServType); - trItem->transactFlag = RsNxsTransac::FLAG_BEGIN_P1 - | RsNxsTransac::FLAG_TYPE_MSG_LIST_RESP; - - trItem->nItems = itemL.size(); - + trItem->transactFlag = RsNxsTransac::FLAG_BEGIN_P1 | RsNxsTransac::FLAG_TYPE_MSG_LIST_RESP; + trItem->nItems = itemL.size(); trItem->timestamp = 0; trItem->PeerId(sslId); trItem->transactionNumber = transN; @@ -2872,7 +3097,6 @@ void RsGxsNetService::processExplicitGroupRequests() mExplicitRequest.clear(); } -#define NXS_NET_DEBUG int RsGxsNetService::sharePublishKey(const RsGxsGroupId& grpId,const std::list& peers) { RS_STACK_MUTEX(mNxsMutex) ; diff --git a/libretroshare/src/gxs/rsgxsnetservice.h b/libretroshare/src/gxs/rsgxsnetservice.h index dbc73f98f..0d3d17c10 100644 --- a/libretroshare/src/gxs/rsgxsnetservice.h +++ b/libretroshare/src/gxs/rsgxsnetservice.h @@ -438,8 +438,6 @@ private: // for an active transaction uint32_t mTransactionTimeOut; - std::map > mPendingPublishKeyRecipients ; - RsPeerId mOwnId; RsNxsNetMgr* mNetMgr; @@ -460,8 +458,9 @@ private: // need to be verfied std::vector mPendingResp; std::vector mPendingCircleVets; - + std::map > mPendingPublishKeyRecipients ; std::map > mExplicitRequest; + std::map > mPartialMsgUpdates ; // nxs sync optimisation // can pull dynamically the latest timestamp for each message diff --git a/libretroshare/src/serialiser/rsgxsupdateitems.h b/libretroshare/src/serialiser/rsgxsupdateitems.h index abd98dcdb..36c37e42e 100644 --- a/libretroshare/src/serialiser/rsgxsupdateitems.h +++ b/libretroshare/src/serialiser/rsgxsupdateitems.h @@ -79,8 +79,8 @@ public: { clear();} virtual ~RsGxsMsgUpdateItem() {} - virtual void clear(); - virtual std::ostream &print(std::ostream &out, uint16_t indent); + virtual void clear(); + virtual std::ostream &print(std::ostream &out, uint16_t indent); RsPeerId peerId; std::map msgUpdateTS; diff --git a/libretroshare/src/serialiser/rsnxsitems.cc b/libretroshare/src/serialiser/rsnxsitems.cc index 4a691eef2..f9bfaf343 100644 --- a/libretroshare/src/serialiser/rsnxsitems.cc +++ b/libretroshare/src/serialiser/rsnxsitems.cc @@ -27,10 +27,10 @@ const uint16_t RsNxsTransac::FLAG_END_FAIL_FULL = 0x0040; /** transaction type **/ const uint16_t RsNxsTransac::FLAG_TYPE_GRP_LIST_RESP = 0x0100; const uint16_t RsNxsTransac::FLAG_TYPE_MSG_LIST_RESP = 0x0200; -const uint16_t RsNxsTransac::FLAG_TYPE_GRP_LIST_REQ = 0x0400; -const uint16_t RsNxsTransac::FLAG_TYPE_MSG_LIST_REQ = 0x0800; -const uint16_t RsNxsTransac::FLAG_TYPE_GRPS = 0x1000; -const uint16_t RsNxsTransac::FLAG_TYPE_MSGS = 0x2000; +const uint16_t RsNxsTransac::FLAG_TYPE_GRP_LIST_REQ = 0x0400; +const uint16_t RsNxsTransac::FLAG_TYPE_MSG_LIST_REQ = 0x0800; +const uint16_t RsNxsTransac::FLAG_TYPE_GRPS = 0x1000; +const uint16_t RsNxsTransac::FLAG_TYPE_MSGS = 0x2000; uint32_t RsNxsSerialiser::size(RsItem *item) {