- added more debug info to gxs net service so as to see what's going on

- force-updating timestamp of unsubscribed groups to avoid re-asking them indefinitly
This should remove the heavy GXS traffic. 



git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@7883 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
csoler 2015-01-28 22:48:59 +00:00
parent 6be1cb82b9
commit c455623738
4 changed files with 186 additions and 117 deletions

View File

@ -1066,12 +1066,14 @@ bool RsGenExchange::subscribeToGroup(uint32_t& token, const RsGxsGroupId& grpId,
{
if(subscribe)
setGroupSubscribeFlags(token, grpId, GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED,
(GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED | GXS_SERV::GROUP_SUBSCRIBE_NOT_SUBSCRIBED));
(GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED | GXS_SERV::GROUP_SUBSCRIBE_NOT_SUBSCRIBED));
else
setGroupSubscribeFlags(token, grpId, GXS_SERV::GROUP_SUBSCRIBE_NOT_SUBSCRIBED,
(GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED | GXS_SERV::GROUP_SUBSCRIBE_NOT_SUBSCRIBED));
(GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED | GXS_SERV::GROUP_SUBSCRIBE_NOT_SUBSCRIBED));
return true;
mNetService->subscribeStatusChanged(grpId,subscribe) ;
return true;
}
bool RsGenExchange::getGroupStatistic(const uint32_t& token, GxsGroupStatistic& stats)

View File

@ -37,6 +37,8 @@
/***
* #define NXS_NET_DEBUG 1
***/
// #define NXS_NET_DEBUG_0 1
// #define NXS_NET_DEBUG_1 1
#define GIXS_CUT_OFF 0
@ -221,20 +223,20 @@ RsMutex NxsBandwidthRecorder::mtx("Bandwidth recorder") ; // Protects the
void RsGxsNetService::syncWithPeers()
{
#ifdef NXS_NET_DEBUG
std::cerr << "RsGxsNetService::syncWithPeers() this=" << (void*)this << std::endl;
#ifdef NXS_NET_DEBUG_0
std::cerr << "RsGxsNetService::syncWithPeers() this=" << (void*)this << ". serviceInfo=" << mServiceInfo << std::endl;
#endif
static RsNxsSerialiser ser(mServType) ; // this is used to estimate bandwidth.
RS_STACK_MUTEX(mNxsMutex) ;
RS_STACK_MUTEX(mNxsMutex) ;
std::set<RsPeerId> peers;
mNetMgr->getOnlineList(mServiceInfo.mServiceType, peers);
std::set<RsPeerId> peers;
mNetMgr->getOnlineList(mServiceInfo.mServiceType, peers);
std::set<RsPeerId>::iterator sit = peers.begin();
std::set<RsPeerId>::iterator sit = peers.begin();
if(mGrpAutoSync)
if(mGrpAutoSync)
{
// for now just grps
for(; sit != peers.end(); ++sit)
@ -256,7 +258,7 @@ void RsGxsNetService::syncWithPeers()
NxsBandwidthRecorder::recordEvent(mServType,grp) ;
#ifdef NXS_NET_DEBUG
#ifdef NXS_NET_DEBUG_0
std::cerr << " sending RsNxsSyncGrp item to peer id: " << *sit << " ts=" << updateTS << std::endl;
#endif
sendItem(grp);
@ -265,58 +267,60 @@ void RsGxsNetService::syncWithPeers()
#ifndef GXS_DISABLE_SYNC_MSGS
typedef std::map<RsGxsGroupId, RsGxsGrpMetaData* > GrpMetaMap;
GrpMetaMap grpMeta;
typedef std::map<RsGxsGroupId, RsGxsGrpMetaData* > GrpMetaMap;
GrpMetaMap grpMeta;
mDataStore->retrieveGxsGrpMetaData(grpMeta);
mDataStore->retrieveGxsGrpMetaData(grpMeta);
GrpMetaMap::iterator
mit = grpMeta.begin();
GrpMetaMap::iterator
mit = grpMeta.begin();
GrpMetaMap toRequest;
GrpMetaMap toRequest;
for(; mit != grpMeta.end(); ++mit)
{
RsGxsGrpMetaData* meta = mit->second;
for(; mit != grpMeta.end(); ++mit)
{
RsGxsGrpMetaData* meta = mit->second;
// if(meta->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED )
// {
// if(meta->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED )
// {
toRequest.insert(std::make_pair(mit->first, meta));
// }else
// delete meta;
}
// }else
// delete meta;
}
grpMeta.clear();
grpMeta.clear();
sit = peers.begin();
sit = peers.begin();
float sending_probability = NxsBandwidthRecorder::computeCurrentSendingProbability() ;
#ifdef NXS_NET_DEBUG
#ifdef NXS_NET_DEBUG_0
std::cerr << " syncWithPeers(): Sending probability = " << sending_probability << std::endl;
#endif
// synchronise group msg for groups which we're subscribed to
for(; sit != peers.end(); ++sit)
{
const RsPeerId& peerId = *sit;
// Synchronise group msg for groups which we're subscribed to
// For each peer and each group, we send to the peer the time stamp of the most
// recent message we have. If the peer has more recent messages he will send them.
for(; sit != peers.end(); ++sit)
{
const RsPeerId& peerId = *sit;
// now see if you have an updateTS so optimise whether you need
// to get a new list of peer data
RsGxsMsgUpdateItem* mui = NULL;
// now see if you have an updateTS so optimise whether you need
// to get a new list of peer data
RsGxsMsgUpdateItem* mui = NULL;
ClientMsgMap::const_iterator cit = mClientMsgUpdateMap.find(peerId);
ClientMsgMap::const_iterator cit = mClientMsgUpdateMap.find(peerId);
if(cit != mClientMsgUpdateMap.end())
{
mui = cit->second;
}
#ifdef NXS_NET_DEBUG
if(cit != mClientMsgUpdateMap.end())
mui = cit->second;
#ifdef NXS_NET_DEBUG_0
std::cerr << " syncing messages with peer " << peerId << std::endl;
#endif
GrpMetaMap::const_iterator mmit = toRequest.begin();
for(; mmit != toRequest.end(); ++mmit)
GrpMetaMap::const_iterator mmit = toRequest.begin();
for(; mmit != toRequest.end(); ++mmit)
{
const RsGxsGrpMetaData* meta = mmit->second;
const RsGxsGroupId& grpId = mmit->first;
@ -324,15 +328,15 @@ void RsGxsNetService::syncWithPeers()
if(!checkCanRecvMsgFromPeer(peerId, *meta))
continue;
// On default, the info has never been received so the TS is 0.
uint32_t updateTS = 0;
if(mui)
{
std::map<RsGxsGroupId, uint32_t>::const_iterator cit2 = mui->msgUpdateTS.find(grpId);
if(cit2 != mui->msgUpdateTS.end())
{
updateTS = cit2->second;
}
}
RsNxsSyncMsg* msg = new RsNxsSyncMsg(mServType);
@ -346,28 +350,49 @@ void RsGxsNetService::syncWithPeers()
if(RSRandom::random_f32() < sending_probability)
{
sendItem(msg);
#ifdef NXS_NET_DEBUG
std::cerr << " sending RsNxsSyncMsg req for grpId=" << grpId << " to peer " << *sit << std::endl;
#ifdef NXS_NET_DEBUG_0
std::cerr << " sending RsNxsSyncMsg req for grpId=" << grpId << " to peer " << *sit << ", last TS=" << std::dec<< time(NULL) - updateTS << " secs ago." << std::endl;
#endif
}
else
{
delete msg ;
#ifdef NXS_NET_DEBUG
std::cerr << " cancel RsNxsSyncMsg req for grpId=" << grpId << *sit << ": not enough bandwidth." << std::endl;
#ifdef NXS_NET_DEBUG_0
std::cerr << " cancel RsNxsSyncMsg req for grpId=" << grpId << " to peer " << *sit << ": not enough bandwidth." << std::endl;
#endif
}
}
}
}
GrpMetaMap::iterator mmit = toRequest.begin();
for(; mmit != toRequest.end(); ++mmit)
{
delete mmit->second;
}
GrpMetaMap::iterator mmit = toRequest.begin();
for(; mmit != toRequest.end(); ++mmit)
{
delete mmit->second;
}
#endif
}
void RsGxsNetService::subscribeStatusChanged(const RsGxsGroupId& grpId,bool subscribed)
{
RS_STACK_MUTEX(mNxsMutex) ;
if(!subscribed)
return ;
// When we subscribe, we reset the time stamps, so that the entire group list
// gets requested once again, for a proper update.
#ifdef NXS_NET_DEBUG_0
std::cerr << "Changing subscribe status for grp " << grpId << " to " << subscribed << ": reseting all msg time stamps." << std::endl;
#endif
for(ClientMsgMap::iterator it(mClientMsgUpdateMap.begin());it!=mClientMsgUpdateMap.end();++it)
{
std::map<RsGxsGroupId,uint32_t>::iterator it2 = it->second->msgUpdateTS.find(grpId) ;
if(it2 != it->second->msgUpdateTS.end())
it->second->msgUpdateTS.erase(it2) ;
}
}
bool RsGxsNetService::fragmentMsg(RsNxsMsg& msg, MsgFragments& msgFragments) const
{
@ -845,14 +870,9 @@ bool RsGxsNetService::saveList(bool& cleanup, std::list<RsItem*>& save)
RS_STACK_MUTEX(mNxsMutex) ;
// hardcore templates
std::transform(mClientGrpUpdateMap.begin(), mClientGrpUpdateMap.end(),
std::back_inserter(save), get_second<ClientGrpMap>());
std::transform(mClientMsgUpdateMap.begin(), mClientMsgUpdateMap.end(),
std::back_inserter(save), get_second<ClientMsgMap>());
std::transform(mServerMsgUpdateMap.begin(), mServerMsgUpdateMap.end(),
std::back_inserter(save), get_second<ServerMsgMap>());
std::transform(mClientGrpUpdateMap.begin(), mClientGrpUpdateMap.end(), std::back_inserter(save), get_second<ClientGrpMap>());
std::transform(mClientMsgUpdateMap.begin(), mClientMsgUpdateMap.end(), std::back_inserter(save), get_second<ClientMsgMap>());
std::transform(mServerMsgUpdateMap.begin(), mServerMsgUpdateMap.end(), std::back_inserter(save), get_second<ServerMsgMap>());
save.push_back(mGrpServerUpdateItem);
@ -871,18 +891,18 @@ RsSerialiser *RsGxsNetService::setupSerialiser()
void RsGxsNetService::recvNxsItemQueue()
{
RsItem *item ;
RsItem *item ;
while(NULL != (item=recvItem()))
{
#ifdef NXS_NET_DEBUG
std::cerr << "RsGxsNetService Item:" << (void*)item << std::endl ;
while(NULL != (item=recvItem()))
{
#ifdef NXS_NET_DEBUG_1
std::cerr << "RsGxsNetService Item:" << (void*)item << std::endl ;
//item->print(std::cerr);
#endif
// RsNxsItem needs dynamic_cast, since they have derived siblings.
//
RsNxsItem *ni = dynamic_cast<RsNxsItem*>(item) ;
if(ni != NULL)
// RsNxsItem needs dynamic_cast, since they have derived siblings.
//
RsNxsItem *ni = dynamic_cast<RsNxsItem*>(item) ;
if(ni != NULL)
{
// a live transaction has a non zero value
if(ni->transactionNumber != 0)
@ -908,12 +928,12 @@ void RsGxsNetService::recvNxsItemQueue()
}
delete item ;
}
else
{
std::cerr << "Not a RsNxsItem, deleting!" << std::endl;
delete(item);
}
}
else
{
std::cerr << "Not a RsNxsItem, deleting!" << std::endl;
delete(item);
}
}
}
@ -1154,7 +1174,7 @@ void RsGxsNetService::updateServerSyncTS()
// as a grp list server also note this is the latest item you have
if(mGrpServerUpdateItem == NULL)
{
mGrpServerUpdateItem = new RsGxsServerGrpUpdateItem(mServType);
mGrpServerUpdateItem = new RsGxsServerGrpUpdateItem(mServType);
}
bool change = false;
@ -1194,9 +1214,7 @@ void RsGxsNetService::updateServerSyncTS()
if(change)
IndicateConfigChanged();
freeAndClearContainerResource<std::map<RsGxsGroupId, RsGxsGrpMetaData*>,
RsGxsGrpMetaData*>(gxsMap);
freeAndClearContainerResource<std::map<RsGxsGroupId, RsGxsGrpMetaData*>, RsGxsGrpMetaData*>(gxsMap);
}
bool RsGxsNetService::locked_checkTransacTimedOut(NxsTransaction* tr)
{
@ -1692,7 +1710,7 @@ void RsGxsNetService::locked_doMsgUpdateWork(const RsNxsTransac *nxsTrans, const
std::cerr << " this is a full update. Updating time stamp." << std::endl;
#endif
mui->msgUpdateTS[grpId] = nxsTrans->updateTS;
IndicateConfigChanged();
IndicateConfigChanged();
}
}
@ -1843,7 +1861,22 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr)
RsGxsGrpMetaData* grpMeta = grpMetaMap[grpId];
if(! (grpMeta->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED ))
{
// For unsubscribed groups, we update the timestamp to now, so that the group content will not be asked to the same
// peer again, unless the peer has new info about it.
// That needs of course to reset that time to 0 when we subscribe.
RsGxsMsgUpdateItem *& pitem(mClientMsgUpdateMap[pid]) ;
if(pitem == NULL)
{
pitem = new RsGxsMsgUpdateItem(mServType) ;
pitem->peerId = pid ;
}
pitem->msgUpdateTS[grpId] = time(NULL) ;
return ;
}
int cutoff = 0;
if(grpMeta != NULL)
@ -2018,7 +2051,13 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr)
void RsGxsNetService::locked_pushGrpTransactionFromList(
std::list<RsNxsItem*>& reqList, const RsPeerId& peerId, const uint32_t& transN)
{
RsNxsTransac* transac = new RsNxsTransac(mServType);
#ifdef NXS_NET_DEBUG
std::cerr << "locked_pushGrpTransactionFromList()" << std::endl;
std::cerr << " nelems = " << reqList.size() << std::endl;
std::cerr << " peerId = " << peerId << std::endl;
std::cerr << " transN = " << transN << std::endl;
#endif
RsNxsTransac* transac = new RsNxsTransac(mServType);
transac->transactFlag = RsNxsTransac::FLAG_TYPE_GRP_LIST_REQ
| RsNxsTransac::FLAG_BEGIN_P1;
transac->timestamp = 0;
@ -2489,7 +2528,13 @@ void RsGxsNetService::cleanTransactionItems(NxsTransaction* tr) const
void RsGxsNetService::locked_pushGrpRespFromList(std::list<RsNxsItem*>& respList,
const RsPeerId& peer, const uint32_t& transN)
{
NxsTransaction* tr = new NxsTransaction();
#ifdef NXS_NET_DEBUG_0
std::cerr << "locked_pushGrpResponseFromList()" << std::endl;
std::cerr << " nelems = " << respList.size() << std::endl;
std::cerr << " peerId = " << peer << std::endl;
std::cerr << " transN = " << transN << std::endl;
#endif
NxsTransaction* tr = new NxsTransaction();
tr->mItems = respList;
tr->mFlag = NxsTransaction::FLAG_STATE_WAITING_CONFIRM;
@ -2512,17 +2557,18 @@ void RsGxsNetService::locked_pushGrpRespFromList(std::list<RsNxsItem*>& respList
bool RsGxsNetService::locked_CanReceiveUpdate(const RsNxsSyncGrp *item)
{
// don't sync if you have no new updates for this peer
if(mGrpServerUpdateItem)
{
if(item->updateTS >= mGrpServerUpdateItem->grpUpdateTS && item->updateTS != 0)
{
#ifdef NXS_NET_DEBUG
std::cerr << "RsGxsNetService::locked_CanReceiveUpdate() No Updates";
std::cerr << std::endl;
#ifdef NXS_NET_DEBUG_0
std::cerr << " local time stamp: " << std::dec<< time(NULL) - mGrpServerUpdateItem->grpUpdateTS << " secs ago. Update sent: " <<
(item->updateTS == 0 || item->updateTS < mGrpServerUpdateItem->grpUpdateTS) << std::endl;
#endif
return false;
}
return (item->updateTS == 0 || item->updateTS < mGrpServerUpdateItem->grpUpdateTS);
}
#ifdef NXS_NET_DEBUG_0
std::cerr << " no local time stamp. Client wants to receive the grp list. " << std::endl;
#endif
return true;
}
@ -2532,24 +2578,26 @@ void RsGxsNetService::handleRecvSyncGroup(RsNxsSyncGrp* item)
RS_STACK_MUTEX(mNxsMutex) ;
RsPeerId peer = item->PeerId();
#ifdef NXS_NET_DEBUG_0
std::cerr << "handleRecvSyncGroup(): from " << peer << ", TS = " << std::dec<< time(NULL) - item->updateTS << " secs ago" << std::endl;
#endif
if(!locked_CanReceiveUpdate(item))
{
#ifdef NXS_NET_DEBUG
#ifdef NXS_NET_DEBUG_1
std::cerr << "RsGxsNetService::handleRecvSyncGroup() Cannot RecvUpdate";
std::cerr << std::endl;
#endif
return;
}
RsPeerId peer = item->PeerId();
std::map<RsGxsGroupId, RsGxsGrpMetaData*> grp;
mDataStore->retrieveGxsGrpMetaData(grp);
if(grp.empty())
{
#ifdef NXS_NET_DEBUG
#ifdef NXS_NET_DEBUG_1
std::cerr << "RsGxsNetService::handleRecvSyncGroup() Grp Empty";
std::cerr << std::endl;
#endif
@ -2564,7 +2612,7 @@ void RsGxsNetService::handleRecvSyncGroup(RsNxsSyncGrp* item)
uint32_t transN = locked_getTransactionId();
std::vector<GrpIdCircleVet> toVet;
#ifdef NXS_NET_DEBUG
#ifdef NXS_NET_DEBUG_1
std::cerr << "RsGxsNetService::handleRecvSyncGroup() \nService: " << mServType << "\nGroup list beings being sent: " << std::endl;
#endif
@ -2590,13 +2638,8 @@ void RsGxsNetService::handleRecvSyncGroup(RsNxsSyncGrp* item)
gItem->PeerId(peer);
gItem->transactionNumber = transN;
itemL.push_back(gItem);
#ifdef NXS_NET_DEBUG
std::cerr << "RsGxsNetService::handleRecvSyncGroup";
std::cerr << std::endl;
std::cerr << "Group : " << grpMeta->mGroupName;
std::cerr << ", id: " << gItem->grpId;
std::cerr << ", authorId: " << gItem->authorId;
std::cerr << std::endl;
#ifdef NXS_NET_DEBUG_0
std::cerr << " sending item for Grp " << mit->first << " name=" << grpMeta->mGroupName << ", publishTS=" << std::dec<< time(NULL) - mit->second->mPublishTs << " secs ago to peer ID " << peer << std::endl;
#endif
}
}
@ -2609,7 +2652,10 @@ void RsGxsNetService::handleRecvSyncGroup(RsNxsSyncGrp* item)
mPendingCircleVets.push_back(new GrpCircleIdRequestVetting(mCircles, mPgpUtils, toVet, peer));
}
locked_pushGrpRespFromList(itemL, peer, transN);
#ifdef NXS_NET_DEBUG_0
std::cerr << " final list sent (after vetting): " << itemL.size() << " elements." << std::endl;
#endif
locked_pushGrpRespFromList(itemL, peer, transN);
return;
}
@ -2866,14 +2912,15 @@ bool RsGxsNetService::locked_CanReceiveUpdate(const RsNxsSyncMsg *item)
{
const RsGxsServerMsgUpdateItem *msui = cit->second;
if(item->updateTS >= msui->msgUpdateTS && item->updateTS != 0)
{
#ifdef NXS_NET_DEBUG
std::cerr << "RsGxsNetService::locked_CanReceiveUpdate(): Msgs up to date" << std::endl;
#ifdef NXS_NET_DEBUG_0
std::cerr << " local time stamp: " << std::dec<< time(NULL) - msui->msgUpdateTS << " secs ago. Update sent: " <<
(item->updateTS == 0 || item->updateTS < msui->msgUpdateTS) << std::endl;
#endif
return false;
}
return (item->updateTS < msui->msgUpdateTS || item->updateTS == 0) ;
}
#ifdef NXS_NET_DEBUG_0
std::cerr << " no local time stamp for this grp. Client wants to receive the grp list. " << std::endl;
#endif
return true;
}
void RsGxsNetService::handleRecvSyncMessage(RsNxsSyncMsg* item)
@ -2884,11 +2931,15 @@ void RsGxsNetService::handleRecvSyncMessage(RsNxsSyncMsg* item)
// even when the group doesn't need update.
mGroupNetworkStats[item->grpId].suppliers.insert(item->PeerId()) ;
const RsPeerId& peer = item->PeerId();
#ifdef NXS_NET_DEBUG_0
std::cerr << "handleRecvSyncMsg(): from " << peer << ", grpId=" << item->grpId << ", TS = " << time(NULL) - item->updateTS << " secs ago" << std::endl;
#endif
if(!locked_CanReceiveUpdate(item))
return;
const RsPeerId& peer = item->PeerId();
GxsMsgMetaResult metaResult;
GxsMsgReq req;
@ -2931,10 +2982,18 @@ void RsGxsNetService::handleRecvSyncMessage(RsNxsSyncMsg* item)
mItem->PeerId(peer);
mItem->transactionNumber = transN;
itemL.push_back(mItem);
}
#ifdef NXS_NET_DEBUG_1
std::cerr << " sending info item for msg id " << mItem->msgId << std::endl;
#endif
}
if(!itemL.empty())
locked_pushMsgRespFromList(itemL, peer, transN);
if(!itemL.empty())
{
#ifdef NXS_NET_DEBUG_0
std::cerr << " sending final msg info list of " << itemL.size() << " items." << std::endl;
#endif
locked_pushMsgRespFromList(itemL, peer, transN);
}
}
std::vector<RsGxsMsgMetaData*>::iterator vit = msgMetas.begin();

View File

@ -143,6 +143,12 @@ public:
*/
virtual bool getGroupNetworkStats(const RsGxsGroupId& id,RsGroupNetworkStats& stats) ;
/*!
* Used to inform the net service that we changed subscription status. That helps
* optimising data transfer when e.g. unsubsribed groups are updated less often, etc
*/
virtual void subscribeStatusChanged(const RsGxsGroupId& id,bool subscribed) ;
/* p3Config methods */
public:

View File

@ -118,6 +118,8 @@ public:
*/
virtual bool getGroupNetworkStats(const RsGxsGroupId& grpId,RsGroupNetworkStats& stats)=0;
virtual void subscribeStatusChanged(const RsGxsGroupId& id,bool subscribed) =0;
/*!
* Request for this group is sent through to peers on your network
* and how many hops from them you've indicated