fixed some serialising bugs, and added proper notification of observer

This commit is contained in:
csoler 2015-12-19 19:00:06 -05:00
parent 6910ad3695
commit 5fcaa36736
2 changed files with 44 additions and 16 deletions

View File

@ -655,8 +655,8 @@ void RsGxsNetService::syncGrpStatistics()
mDataStore->retrieveGxsGrpMetaData(grpMeta); mDataStore->retrieveGxsGrpMetaData(grpMeta);
std::set<RsPeerId> peers; std::set<RsPeerId> online_peers;
mNetMgr->getOnlineList(mServiceInfo.mServiceType, peers); mNetMgr->getOnlineList(mServiceInfo.mServiceType, online_peers);
// Go through group statistics and groups without information are re-requested to random peers selected // Go through group statistics and groups without information are re-requested to random peers selected
// among the ones who provided the group info. // among the ones who provided the group info.
@ -684,25 +684,28 @@ void RsGxsNetService::syncGrpStatistics()
++rit ; ++rit ;
for(uint32_t i=0;i<std::min(rec.suppliers.size(),(size_t)GROUP_STATS_UPDATE_NB_PEERS);++i) for(uint32_t i=0;i<std::min(rec.suppliers.size(),(size_t)GROUP_STATS_UPDATE_NB_PEERS);++i)
{ {
RsPeerId peer_id = *rit ; RsPeerId peer_id = *rit ;
++rit ;
++rit ; if(online_peers.find(peer_id) != online_peers.end()) // check that the peer is online
if(rit == rec.suppliers.end()) {
rit = rec.suppliers.begin() ; if(rit == rec.suppliers.end())
rit = rec.suppliers.begin() ;
#ifdef NXS_NET_DEBUG_0 #ifdef NXS_NET_DEBUG_0
GXSNETDEBUG_PG(peer_id,it->first) << " asking friend " << peer_id << " for an update of stats for group " << it->first << std::endl; GXSNETDEBUG_PG(peer_id,it->first) << " asking friend " << peer_id << " for an update of stats for group " << it->first << std::endl;
#endif #endif
RsNxsSyncGrpStats *grs = new RsNxsSyncGrpStats(mServType) ; RsNxsSyncGrpStats *grs = new RsNxsSyncGrpStats(mServType) ;
grs->request_type = RsNxsSyncGrpStats::GROUP_INFO_TYPE_REQUEST ; grs->request_type = RsNxsSyncGrpStats::GROUP_INFO_TYPE_REQUEST ;
grs->grpId = it->first ; grs->grpId = it->first ;
grs->PeerId(peer_id) ; grs->PeerId(peer_id) ;
sendItem(grs) ; sendItem(grs) ;
} }
}
} }
#ifdef NXS_NET_DEBUG_0 #ifdef NXS_NET_DEBUG_0
else else
@ -725,6 +728,14 @@ void RsGxsNetService::handleRecvSyncGrpStatistics(RsNxsSyncGrpStats *grs)
RsGxsGrpMetaData* grpMeta = grpMetas[grs->grpId]; RsGxsGrpMetaData* grpMeta = grpMetas[grs->grpId];
if(grpMeta == NULL)
{
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG_PG(grs->PeerId(),grs->grpId) << " Group is unknown. Not reponding." << std::endl;
#endif
return ;
}
// check if we're subscribed or not // check if we're subscribed or not
if(! (grpMeta->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED )) if(! (grpMeta->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED ))
@ -782,9 +793,15 @@ void RsGxsNetService::handleRecvSyncGrpStatistics(RsNxsSyncGrpStats *grs)
RS_STACK_MUTEX(mNxsMutex) ; RS_STACK_MUTEX(mNxsMutex) ;
RsGroupNetworkStatsRecord& rec(mGroupNetworkStats[grs->grpId]) ; RsGroupNetworkStatsRecord& rec(mGroupNetworkStats[grs->grpId]) ;
int32_t old_count = rec.max_visible_count ;
int32_t old_suppliers_count = rec.suppliers.size() ;
rec.suppliers.insert(grs->PeerId()) ; rec.suppliers.insert(grs->PeerId()) ;
rec.max_visible_count = std::max(rec.max_visible_count,grs->number_of_posts) ; rec.max_visible_count = std::max(rec.max_visible_count,grs->number_of_posts) ;
rec.update_TS = time(NULL) ; rec.update_TS = time(NULL) ;
if (old_count != rec.max_visible_count || old_suppliers_count != rec.suppliers.size())
mObserver->notifyChangedGroupStats(grs->grpId);
} }
else else
std::cerr << "(EE) RsGxsNetService::handleRecvSyncGrpStatistics(): unknown item type " << grs->request_type << " found. This is a bug." << std::endl; std::cerr << "(EE) RsGxsNetService::handleRecvSyncGrpStatistics(): unknown item type " << grs->request_type << " found. This is a bug." << std::endl;

View File

@ -38,6 +38,7 @@ uint32_t RsNxsSerialiser::size(RsItem *item) {
RsNxsGrp* ngp; RsNxsGrp* ngp;
RsNxsMsg* nmg; RsNxsMsg* nmg;
RsNxsSyncGrp* sg; RsNxsSyncGrp* sg;
RsNxsSyncGrpStats* sgs;
RsNxsSyncGrpItem* sgl; RsNxsSyncGrpItem* sgl;
RsNxsSyncMsg* sgm; RsNxsSyncMsg* sgm;
RsNxsSyncMsgItem* sgml; RsNxsSyncMsgItem* sgml;
@ -52,6 +53,9 @@ uint32_t RsNxsSerialiser::size(RsItem *item) {
{ {
return sizeNxsSyncGrp(sg); return sizeNxsSyncGrp(sg);
} else if((sgs = dynamic_cast<RsNxsSyncGrpStats*>(item)) != NULL)
{
return sizeNxsSyncGrpStats(sgs);
}else if(( ntx = dynamic_cast<RsNxsTransac*>(item)) != NULL){ }else if(( ntx = dynamic_cast<RsNxsTransac*>(item)) != NULL){
return sizeNxsTrans(ntx); return sizeNxsTrans(ntx);
} }
@ -109,6 +113,8 @@ RsItem* RsNxsSerialiser::deserialise(void *data, uint32_t *size) {
return deserialNxsSyncMsgItem(data, size); return deserialNxsSyncMsgItem(data, size);
case RS_PKT_SUBTYPE_NXS_GRP: case RS_PKT_SUBTYPE_NXS_GRP:
return deserialNxsGrp(data, size); return deserialNxsGrp(data, size);
case RS_PKT_SUBTYPE_NXS_SYNC_GRP_STATS:
return deserialNxsSyncGrpStats(data, size);
case RS_PKT_SUBTYPE_NXS_MSG: case RS_PKT_SUBTYPE_NXS_MSG:
return deserialNxsMsg(data, size); return deserialNxsMsg(data, size);
case RS_PKT_SUBTYPE_NXS_TRANS: case RS_PKT_SUBTYPE_NXS_TRANS:
@ -134,6 +140,7 @@ bool RsNxsSerialiser::serialise(RsItem *item, void *data, uint32_t *size){
RsNxsGrp* ngp; RsNxsGrp* ngp;
RsNxsMsg* nmg; RsNxsMsg* nmg;
RsNxsSyncGrp* sg; RsNxsSyncGrp* sg;
RsNxsSyncGrpStats* sgs;
RsNxsSyncGrpItem* sgl; RsNxsSyncGrpItem* sgl;
RsNxsSyncMsg* sgm; RsNxsSyncMsg* sgm;
RsNxsSyncMsgItem* sgml; RsNxsSyncMsgItem* sgml;
@ -144,6 +151,10 @@ bool RsNxsSerialiser::serialise(RsItem *item, void *data, uint32_t *size){
{ {
return serialiseNxsSyncGrp(sg, data, size); return serialiseNxsSyncGrp(sg, data, size);
}else if((sgs = dynamic_cast<RsNxsSyncGrpStats*>(item)) != NULL)
{
return serialiseNxsSyncGrpStats(sgs, data, size);
}else if ((ntx = dynamic_cast<RsNxsTransac*>(item)) != NULL) }else if ((ntx = dynamic_cast<RsNxsTransac*>(item)) != NULL)
{ {
return serialiseNxsTrans(ntx, data, size); return serialiseNxsTrans(ntx, data, size);