From 6910ad36959c3045e096dbd83607e4ae1c5292ec Mon Sep 17 00:00:00 2001 From: csoler Date: Sat, 19 Dec 2015 17:38:52 -0500 Subject: [PATCH 1/3] added stats exchange system to gather number of posts in unsubscribed groups without the need to actually DL the messages (reduced bw a lot) --- libretroshare/src/gxs/rsgxsdataaccess.cc | 2 +- libretroshare/src/gxs/rsgxsnetservice.cc | 245 +++++++++++++++++---- libretroshare/src/gxs/rsgxsnetservice.h | 10 +- libretroshare/src/serialiser/rsnxsitems.cc | 140 +++++++++++- libretroshare/src/serialiser/rsnxsitems.h | 45 +++- 5 files changed, 386 insertions(+), 56 deletions(-) diff --git a/libretroshare/src/gxs/rsgxsdataaccess.cc b/libretroshare/src/gxs/rsgxsdataaccess.cc index 94c2373fe..a4d6c0b57 100644 --- a/libretroshare/src/gxs/rsgxsdataaccess.cc +++ b/libretroshare/src/gxs/rsgxsdataaccess.cc @@ -1552,7 +1552,7 @@ bool RsGxsDataAccess::getGroupStatistic(GroupStatisticRequest *req) req->mGroupStatistic.mNumChildMsgsNew = 0; req->mGroupStatistic.mNumChildMsgsUnread = 0; - for(int i = 0; i < msgMetaV.size(); ++i) + for(uint32_t i = 0; i < msgMetaV.size(); ++i) { RsGxsMsgMetaData* m = msgMetaV[i]; req->mGroupStatistic.mTotalSizeOfMsgs += m->mMsgSize + m->serial_size(); diff --git a/libretroshare/src/gxs/rsgxsnetservice.cc b/libretroshare/src/gxs/rsgxsnetservice.cc index 38f5110c3..a654ae720 100644 --- a/libretroshare/src/gxs/rsgxsnetservice.cc +++ b/libretroshare/src/gxs/rsgxsnetservice.cc @@ -207,12 +207,12 @@ NXS_NET_DEBUG_4 vetting NXS_NET_DEBUG_5 summary of transactions (useful to just know what comes in/out) ***/ -//#define NXS_NET_DEBUG_0 1 +#define NXS_NET_DEBUG_0 1 //#define NXS_NET_DEBUG_1 1 //#define NXS_NET_DEBUG_2 1 //#define NXS_NET_DEBUG_3 1 //#define NXS_NET_DEBUG_4 1 -//#define NXS_NET_DEBUG_5 1 +#define NXS_NET_DEBUG_5 1 #define GIXS_CUT_OFF 0 @@ -224,11 +224,13 @@ // A small value for MAX_REQLIST_SIZE is likely to help messages to propagate in a chaotic network, but will also slow them down. // A small SYNC_PERIOD fasten message propagation, but is likely to overload the server side of transactions (e.g. overload outqueues). // -#define SYNC_PERIOD 60 -#define MAX_REQLIST_SIZE 20 // No more than 20 items per msg request list => creates smaller transactions that are less likely to be cancelled. -#define TRANSAC_TIMEOUT 2000 // In seconds. Has been increased to avoid epidemic transaction cancelling due to overloaded outqueues. -#define SECURITY_DELAY_TO_FORCE_CLIENT_REUPDATE 3600 // force re-update if there happens to be a large delay between our server side TS and the client side TS of friends -#define REJECTED_MESSAGE_RETRY_DELAY 24*3600 // re-try rejected messages every 24hrs. Most of the time this is because the peer's reputation has changed. +#define SYNC_PERIOD 60 +#define MAX_REQLIST_SIZE 20 // No more than 20 items per msg request list => creates smaller transactions that are less likely to be cancelled. +#define TRANSAC_TIMEOUT 2000 // In seconds. Has been increased to avoid epidemic transaction cancelling due to overloaded outqueues. +#define SECURITY_DELAY_TO_FORCE_CLIENT_REUPDATE 3600 // force re-update if there happens to be a large delay between our server side TS and the client side TS of friends +#define REJECTED_MESSAGE_RETRY_DELAY 24*3600 // re-try rejected messages every 24hrs. Most of the time this is because the peer's reputation has changed. +#define GROUP_STATS_UPDATE_DELAY 1800 // update unsubscribed group statistics every 30 mins +#define GROUP_STATS_UPDATE_NB_PEERS 2 // update unsubscribed group statistics every 30 mins // Debug system to allow to print only for some IDs (group, Peer, etc) @@ -323,6 +325,8 @@ int RsGxsNetService::tick() if((elapsed) < now) { syncWithPeers(); + syncGrpStatistics(); + mSyncTs = now; } @@ -332,14 +336,13 @@ int RsGxsNetService::tick() mLastKeyPublishTs = now ; } + if(now > 3600 + mLastCleanRejectedMessages) { - sharePublishKeysPending() ; - mLastCleanRejectedMessages = now ; - } cleanRejectedMessages() ; - + } + return 1; } @@ -543,14 +546,14 @@ void RsGxsNetService::syncWithPeers() for(; mit != grpMeta.end(); ++mit) { - RsGxsGrpMetaData* meta = mit->second; + RsGxsGrpMetaData* meta = mit->second; - // This was commented out because we want to know how many messages are available for unsubscribed groups. - // if(meta->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED ) - // { - toRequest.insert(std::make_pair(mit->first, meta)); - // }else - // delete meta; + // This was commented out because we want to know how many messages are available for unsubscribed groups. + + if(meta->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED ) + toRequest.insert(std::make_pair(mit->first, meta)); + else + delete meta; } grpMeta.clear(); @@ -640,6 +643,153 @@ void RsGxsNetService::syncWithPeers() #endif } +void RsGxsNetService::syncGrpStatistics() +{ + RS_STACK_MUTEX(mNxsMutex) ; + +#ifdef NXS_NET_DEBUG_0 + GXSNETDEBUG___<< "Sync-ing group statistics." << std::endl; +#endif + typedef std::map GrpMetaMap; + GrpMetaMap grpMeta; + + mDataStore->retrieveGxsGrpMetaData(grpMeta); + + std::set peers; + mNetMgr->getOnlineList(mServiceInfo.mServiceType, peers); + + // Go through group statistics and groups without information are re-requested to random peers selected + // among the ones who provided the group info. + + time_t now = time(NULL) ; + + for(std::map::iterator it(grpMeta.begin());it!=grpMeta.end();++it) + { + RsGroupNetworkStatsRecord& rec(mGroupNetworkStats[it->first]) ; +#ifdef NXS_NET_DEBUG_0 + GXSNETDEBUG__G(it->first) << " group " << it->first ; +#endif + + if(rec.update_TS + GROUP_STATS_UPDATE_DELAY < now && rec.suppliers.size() > 0) + { +#ifdef NXS_NET_DEBUG_0 + GXSNETDEBUG__G(it->first) << " needs update. Randomly asking to some friends" << std::endl; +#endif + // randomly select 2 friends among the suppliers of this group + + uint32_t n = RSRandom::random_u32() % rec.suppliers.size() ; + + std::set::const_iterator rit = rec.suppliers.begin(); + for(int i=0;ifirst) << " asking friend " << peer_id << " for an update of stats for group " << it->first << std::endl; +#endif + + RsNxsSyncGrpStats *grs = new RsNxsSyncGrpStats(mServType) ; + + grs->request_type = RsNxsSyncGrpStats::GROUP_INFO_TYPE_REQUEST ; + grs->grpId = it->first ; + grs->PeerId(peer_id) ; + + sendItem(grs) ; + } + } +#ifdef NXS_NET_DEBUG_0 + else + GXSNETDEBUG__G(it->first) << " up to date." << std::endl; +#endif + } +} + +void RsGxsNetService::handleRecvSyncGrpStatistics(RsNxsSyncGrpStats *grs) +{ + if(grs->request_type == RsNxsSyncGrpStats::GROUP_INFO_TYPE_REQUEST) + { +#ifdef NXS_NET_DEBUG_0 + GXSNETDEBUG_PG(grs->PeerId(),grs->grpId) << "Received Grp update stats Request for group " << grs->grpId << " from friend " << grs->PeerId() << std::endl; +#endif + std::map grpMetas; + grpMetas[grs->grpId] = NULL; + + mDataStore->retrieveGxsGrpMetaData(grpMetas); + + RsGxsGrpMetaData* grpMeta = grpMetas[grs->grpId]; + + // check if we're subscribed or not + + if(! (grpMeta->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED )) + { +#ifdef NXS_NET_DEBUG_0 + GXSNETDEBUG_PG(grs->PeerId(),grs->grpId) << " Group is not subscribed. Not reponding." << std::endl; +#endif + delete grpMeta ; + return ; + } + delete grpMeta ; + + // now count available messages + + GxsMsgReq reqIds; + reqIds[grs->grpId] = std::vector(); + GxsMsgMetaResult result; + +#ifdef NXS_NET_DEBUG_0 + GXSNETDEBUG_PG(grs->PeerId(),grs->grpId) << " retrieving message information." << std::endl; +#endif + mDataStore->retrieveGxsMsgMetaData(reqIds, result); + + const std::vector& vec(result[grs->grpId]) ; + + if(vec.empty()) // that means we don't have any, or there isn't any, but since the default is always 0, no need to send. + return ; + + RsNxsSyncGrpStats *grs_resp = new RsNxsSyncGrpStats(mServType) ; + grs_resp->request_type = RsNxsSyncGrpStats::GROUP_INFO_TYPE_RESPONSE ; + grs_resp->number_of_posts = vec.size(); + grs_resp->grpId = grs->grpId; + grs_resp->PeerId(grs->PeerId()) ; + + grs_resp->last_post_TS = 0 ; + + for(uint32_t i=0;ilast_post_TS < vec[i]->mPublishTs) + grs_resp->last_post_TS = vec[i]->mPublishTs; + + delete vec[i] ; + } +#ifdef NXS_NET_DEBUG_0 + GXSNETDEBUG_PG(grs->PeerId(),grs->grpId) << " sending back statistics item with " << vec.size() << " elements." << std::endl; +#endif + + sendItem(grs_resp) ; + } + else if(grs->request_type == RsNxsSyncGrpStats::GROUP_INFO_TYPE_RESPONSE) + { +#ifdef NXS_NET_DEBUG_0 + GXSNETDEBUG_PG(grs->PeerId(),grs->grpId) << "Received Grp update stats item from peer " << grs->PeerId() << " for group " << grs->grpId << ", reporting " << grs->number_of_posts << " posts." << std::endl; +#endif + RS_STACK_MUTEX(mNxsMutex) ; + RsGroupNetworkStatsRecord& rec(mGroupNetworkStats[grs->grpId]) ; + + rec.suppliers.insert(grs->PeerId()) ; + rec.max_visible_count = std::max(rec.max_visible_count,grs->number_of_posts) ; + rec.update_TS = time(NULL) ; + } + else + std::cerr << "(EE) RsGxsNetService::handleRecvSyncGrpStatistics(): unknown item type " << grs->request_type << " found. This is a bug." << std::endl; +} + void RsGxsNetService::subscribeStatusChanged(const RsGxsGroupId& grpId,bool subscribed) { RS_STACK_MUTEX(mNxsMutex) ; @@ -1118,27 +1268,29 @@ private: bool RsGxsNetService::loadList(std::list &load) { - RS_STACK_MUTEX(mNxsMutex) ; + RS_STACK_MUTEX(mNxsMutex) ; - std::for_each(load.begin(), load.end(), StoreHere(mClientGrpUpdateMap, mClientMsgUpdateMap, mServerMsgUpdateMap, mGrpServerUpdateItem)); + std::for_each(load.begin(), load.end(), StoreHere(mClientGrpUpdateMap, mClientMsgUpdateMap, mServerMsgUpdateMap, mGrpServerUpdateItem)); + time_t now = time(NULL); - for(ClientMsgMap::iterator it = mClientMsgUpdateMap.begin();it!=mClientMsgUpdateMap.end();++it) - for(std::map::const_iterator it2(it->second->msgUpdateInfos.begin());it2!=it->second->msgUpdateInfos.end();++it2) - { - RsGroupNetworkStatsRecord& gnsr = mGroupNetworkStats[it2->first] ; + for(ClientMsgMap::iterator it = mClientMsgUpdateMap.begin();it!=mClientMsgUpdateMap.end();++it) + for(std::map::const_iterator it2(it->second->msgUpdateInfos.begin());it2!=it->second->msgUpdateInfos.end();++it2) + { + RsGroupNetworkStatsRecord& gnsr = mGroupNetworkStats[it2->first] ; - // At each reload, divide the last count by 2. This gradually flushes old information away. + // At each reload, divide the last count by 2. This gradually flushes old information away. - gnsr.max_visible_count = std::max(it2->second.message_count,gnsr.max_visible_count/2) ; + gnsr.max_visible_count = std::max(it2->second.message_count,gnsr.max_visible_count/2) ; + gnsr.update_TS = now - GROUP_STATS_UPDATE_DELAY + (RSRandom::random_u32()%(GROUP_STATS_UPDATE_DELAY/10)) ; - // Similarly, we remove some of the suppliers randomly. If they are - // actual suppliers, they will come back automatically. If they are - // not, they will be forgotten. + // Similarly, we remove some of the suppliers randomly. If they are + // actual suppliers, they will come back automatically. If they are + // not, they will be forgotten. - if(RSRandom::random_f32() > 0.2) - gnsr.suppliers.insert(it->first) ; - } - return true; + if(RSRandom::random_f32() > 0.2) + gnsr.suppliers.insert(it->first) ; + } + return true; } #include @@ -1206,9 +1358,10 @@ void RsGxsNetService::recvNxsItemQueue() switch(ni->PacketSubType()) { - case RS_PKT_SUBTYPE_NXS_SYNC_GRP: handleRecvSyncGroup (dynamic_cast(ni)) ; break ; - case RS_PKT_SUBTYPE_NXS_SYNC_MSG: handleRecvSyncMessage (dynamic_cast(ni)) ; break ; - case RS_PKT_SUBTYPE_NXS_GRP_PUBLISH_KEY: handleRecvPublishKeys (dynamic_cast(ni)) ; break ; + case RS_PKT_SUBTYPE_NXS_SYNC_GRP_STATS: handleRecvSyncGrpStatistics (dynamic_cast(ni)) ; break ; + case RS_PKT_SUBTYPE_NXS_SYNC_GRP: handleRecvSyncGroup (dynamic_cast(ni)) ; break ; + case RS_PKT_SUBTYPE_NXS_SYNC_MSG: handleRecvSyncMessage (dynamic_cast(ni)) ; break ; + case RS_PKT_SUBTYPE_NXS_GRP_PUBLISH_KEY: handleRecvPublishKeys (dynamic_cast(ni)) ; break ; default: std::cerr << "Unhandled item subtype " << (uint32_t) ni->PacketSubType() << " in RsGxsNetService: " << std::endl; break; } @@ -2303,6 +2456,8 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr) // peer again, unless the peer has new info about it. It's important to use the same clock (this is peer's clock) so that // we never compare times from different (and potentially badly sync-ed clocks) + std::cerr << "(EE) stepping in part of the code (" << __PRETTY_FUNCTION__ << ") where we shouldn't. This is a bug." << std::endl; + locked_stampPeerGroupUpdateTime(pid,grpId,tr->mTransaction->updateTS,msgItemL.size()) ; if(grpMeta) @@ -3079,17 +3234,17 @@ void RsGxsNetService::handleRecvSyncGroup(RsNxsSyncGrp* item) RS_STACK_MUTEX(mNxsMutex) ; - RsPeerId peer = item->PeerId(); + RsPeerId peer = item->PeerId(); #ifdef NXS_NET_DEBUG_0 - GXSNETDEBUG_P_(peer) << "HandleRecvSyncGroup(): Service: " << mServType << " from " << peer << ", Last update TS (from myself) sent from peer is T = " << std::dec<< time(NULL) - item->updateTS << " secs ago" << std::endl; + GXSNETDEBUG_P_(peer) << "HandleRecvSyncGroup(): Service: " << mServType << " from " << peer << ", Last update TS (from myself) sent from peer is T = " << std::dec<< time(NULL) - item->updateTS << " secs ago" << std::endl; #endif - if(!locked_CanReceiveUpdate(item)) + if(!locked_CanReceiveUpdate(item)) { #ifdef NXS_NET_DEBUG_0 - GXSNETDEBUG_P_(peer) << " RsGxsNetService::handleRecvSyncGroup() update will not be sent." << std::endl; + GXSNETDEBUG_P_(peer) << " RsGxsNetService::handleRecvSyncGroup() update will not be sent." << std::endl; #endif - return; + return; } std::map grp; @@ -3121,6 +3276,8 @@ void RsGxsNetService::handleRecvSyncGroup(RsNxsSyncGrp* item) { RsGxsGrpMetaData* grpMeta = mit->second; + // Only send info about subscribed groups. + if(grpMeta->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED) { @@ -3152,9 +3309,9 @@ void RsGxsNetService::handleRecvSyncGroup(RsNxsSyncGrp* item) } #ifdef NXS_NET_DEBUG_0 - GXSNETDEBUG_P_(peer) << " final list sent (after vetting): " << itemL.size() << " elements." << std::endl; + GXSNETDEBUG_P_(peer) << " final list sent (after vetting): " << itemL.size() << " elements." << std::endl; #endif - locked_pushGrpRespFromList(itemL, peer, transN); + locked_pushGrpRespFromList(itemL, peer, transN); return; } @@ -3437,7 +3594,7 @@ void RsGxsNetService::handleRecvSyncMessage(RsNxsSyncMsg* item) #endif return; } - if(!(grpMeta->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED )) + if(!(grpMeta->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED )) { #ifdef NXS_NET_DEBUG_0 GXSNETDEBUG_PG(item->PeerId(),item->grpId) << " Grp is not subscribed." << std::endl; diff --git a/libretroshare/src/gxs/rsgxsnetservice.h b/libretroshare/src/gxs/rsgxsnetservice.h index 6aa23d9d8..5ab2ef0b4 100644 --- a/libretroshare/src/gxs/rsgxsnetservice.h +++ b/libretroshare/src/gxs/rsgxsnetservice.h @@ -54,7 +54,8 @@ class RsGroupNetworkStatsRecord RsGroupNetworkStatsRecord() { max_visible_count= 0 ; } std::set suppliers ; - uint32_t max_visible_count ; + uint32_t max_visible_count ; + time_t update_TS ; }; /*! @@ -319,6 +320,12 @@ private: */ void handleRecvSyncGroup(RsNxsSyncGrp* item); + /*! + * Handles an nxs item for group statistics + * @param item contaims update time stamp and number of messages + */ + void handleRecvSyncGrpStatistics(RsNxsSyncGrpStats *grs); + /*! * Handles an nxs item for msgs synchronisation * @param item contaims msg sync info @@ -359,6 +366,7 @@ private: void locked_pushGrpRespFromList(std::list& respList, const RsPeerId& peer, const uint32_t& transN); void locked_pushMsgRespFromList(std::list& itemL, const RsPeerId& sslId, const uint32_t& transN); void syncWithPeers(); + void syncGrpStatistics(); void addGroupItemToList(NxsTransaction*& tr, const RsGxsGroupId& grpId, uint32_t& transN, std::list& reqList); diff --git a/libretroshare/src/serialiser/rsnxsitems.cc b/libretroshare/src/serialiser/rsnxsitems.cc index f9bfaf343..30bc09864 100644 --- a/libretroshare/src/serialiser/rsnxsitems.cc +++ b/libretroshare/src/serialiser/rsnxsitems.cc @@ -330,6 +330,54 @@ bool RsNxsSerialiser::serialiseNxsGrp(RsNxsGrp *item, void *data, uint32_t *size return ok; } + +bool RsNxsSerialiser::serialiseNxsSyncGrpStats(RsNxsSyncGrpStats *item, void *data, uint32_t *size) +{ + +#ifdef RSSERIAL_DEBUG + std::cerr << "RsNxsSerialiser::serialiseNxsSyncGrpStats()" << std::endl; +#endif + + uint32_t tlvsize = sizeNxsSyncGrpStats(item); + uint32_t offset = 0; + + if(*size < tlvsize){ +#ifdef RSSERIAL_DEBUG + std::cerr << "RsNxsSerialiser::serialiseNxsSyncGrpStats()" << std::endl; +#endif + return false; + } + + *size = tlvsize; + + bool ok = true; + + ok &= setRsItemHeader(data, tlvsize, item->PacketId(), tlvsize); + + /* skip the header */ + offset += 8; + + ok &= setRawUInt32(data, *size, &offset, item->request_type); + ok &= item->grpId.serialise(data, *size, offset) ; + ok &= setRawUInt32(data, *size, &offset, item->number_of_posts); + ok &= setRawUInt32(data, *size, &offset, item->last_post_TS); + + if(offset != tlvsize){ +#ifdef RSSERIAL_DEBUG + std::cerr << "RsNxsSerialiser::serialiseSyncGrpStats() FAIL Size Error! " << std::endl; +#endif + ok = false; + } + +#ifdef RSSERIAL_DEBUG + if (!ok) + { + std::cerr << "RsNxsSerialiser::serialiseSyncGrpStats() NOK" << std::endl; + } +#endif + + return ok; +} bool RsNxsSerialiser::serialiseNxsSyncGrp(RsNxsSyncGrp *item, void *data, uint32_t *size) { @@ -707,6 +755,71 @@ RsNxsMsg* RsNxsSerialiser::deserialNxsMsg(void *data, uint32_t *size){ return item; } +RsNxsSyncGrpStats* RsNxsSerialiser::deserialNxsSyncGrpStats(void *data, uint32_t *size){ + +#ifdef RSSERIAL_DEBUG + std::cerr << "RsNxsSerialiser::deserialNxsSyncGrpStats()" << std::endl; +#endif + /* get the type and size */ + uint32_t rstype = getRsItemId(data); + uint32_t rssize = getRsItemSize(data); + + uint32_t offset = 0; + + + if ((RS_PKT_VERSION_SERVICE != getRsItemVersion(rstype)) || (SERVICE_TYPE != getRsItemService(rstype)) || (RS_PKT_SUBTYPE_NXS_SYNC_GRP_STATS != getRsItemSubType(rstype))) + { +#ifdef RSSERIAL_DEBUG + std::cerr << "RsNxsSerialiser::deserialNxsSyncGrpStats() FAIL wrong type" << std::endl; +#endif + return NULL; /* wrong type */ + } + + if (*size < rssize) /* check size */ + { +#ifdef RSSERIAL_DEBUG + std::cerr << "RsNxsSerialiser::deserialNxsSyncGrpStats() FAIL wrong size" << std::endl; +#endif + return NULL; /* not enough data */ + } + + /* set the packet length */ + *size = rssize; + + bool ok = true; + + RsNxsSyncGrpStats* item = new RsNxsSyncGrpStats(getRsItemService(rstype)); + /* skip the header */ + offset += 8; + + ok &= getRawUInt32(data, *size, &offset, &(item->request_type)); + ok &= item->grpId.deserialise(data, *size, offset) ; + ok &= getRawUInt32(data, *size, &offset, &(item->number_of_posts)); + ok &= getRawUInt32(data, *size, &offset, &(item->last_post_TS)); + + if (offset != rssize) + { +#ifdef RSSERIAL_DEBUG + std::cerr << "RsNxsSerialiser::deserialNxsSyncGrpStats() FAIL size mismatch" << std::endl; +#endif + /* error */ + delete item; + return NULL; + } + + if (!ok) + { +#ifdef RSSERIAL_DEBUG + std::cerr << "RsNxsSerialiser::deserialNxsSyncGrpStats() NOK" << std::endl; +#endif + delete item; + return NULL; + } + + return item; +} + + RsNxsSyncGrp* RsNxsSerialiser::deserialNxsSyncGrp(void *data, uint32_t *size){ @@ -1166,7 +1279,17 @@ uint32_t RsNxsSerialiser::sizeNxsSyncGrp(RsNxsSyncGrp *item) return s; } +uint32_t RsNxsSerialiser::sizeNxsSyncGrpStats(RsNxsSyncGrpStats *item) +{ + uint32_t s = 8; // header size + s += 4; // request type + s += item->grpId.serial_size(); + s += 4; // number_of_posts + s += 4; // last_post_TS + + return s; +} uint32_t RsNxsSerialiser::sizeNxsSyncGrpItem(RsNxsSyncGrpItem *item) { @@ -1260,7 +1383,6 @@ void RsNxsSyncMsg::clear() syncHash.clear(); updateTS = 0; } - void RsNxsSyncGrpItem::clear() { flag = 0; @@ -1430,7 +1552,23 @@ std::ostream& RsNxsMsg::print(std::ostream &out, uint16_t indent){ return out; } +std::ostream& RsNxsSyncGrpStats::print(std::ostream &out, uint16_t indent){ + printRsItemBase(out, "RsNxsSyncGrpStats", indent); + uint16_t int_Indent = indent + 2; + + out << "available posts: " << number_of_posts << std::endl; + printIndent(out , int_Indent); + out << "last update: " << last_post_TS << std::endl; + printIndent(out , int_Indent); + out << "group ID: " << grpId << std::endl; + printIndent(out , int_Indent); + out << "request type: " << request_type << std::endl; + printIndent(out , int_Indent); + + printRsItemEnd(out ,"RsNxsSyncGrpStats", indent); + return out; +} std::ostream& RsNxsTransac::print(std::ostream &out, uint16_t indent){ printRsItemBase(out, "RsNxsTransac", indent); diff --git a/libretroshare/src/serialiser/rsnxsitems.h b/libretroshare/src/serialiser/rsnxsitems.h index 01f82d9c8..76a4d91b3 100644 --- a/libretroshare/src/serialiser/rsnxsitems.h +++ b/libretroshare/src/serialiser/rsnxsitems.h @@ -36,15 +36,17 @@ #include "serialiser/rstlvkeys.h" #include "gxs/rsgxsdata.h" +// These items have "flag type" numbers, but this is not used. -const uint8_t RS_PKT_SUBTYPE_NXS_SYNC_GRP = 0x0001; -const uint8_t RS_PKT_SUBTYPE_NXS_SYNC_GRP_ITEM = 0x0002; -const uint8_t RS_PKT_SUBTYPE_NXS_GRP = 0x0004; -const uint8_t RS_PKT_SUBTYPE_NXS_SYNC_MSG_ITEM = 0x0008; -const uint8_t RS_PKT_SUBTYPE_NXS_SYNC_MSG = 0x0010; -const uint8_t RS_PKT_SUBTYPE_NXS_MSG = 0x0020; -const uint8_t RS_PKT_SUBTYPE_NXS_TRANS = 0x0040; -const uint8_t RS_PKT_SUBTYPE_NXS_GRP_PUBLISH_KEY = 0x0080; +const uint8_t RS_PKT_SUBTYPE_NXS_SYNC_GRP = 0x01; +const uint8_t RS_PKT_SUBTYPE_NXS_SYNC_GRP_ITEM = 0x02; +const uint8_t RS_PKT_SUBTYPE_NXS_SYNC_GRP_STATS = 0x03; +const uint8_t RS_PKT_SUBTYPE_NXS_GRP = 0x04; +const uint8_t RS_PKT_SUBTYPE_NXS_SYNC_MSG_ITEM = 0x08; +const uint8_t RS_PKT_SUBTYPE_NXS_SYNC_MSG = 0x10; +const uint8_t RS_PKT_SUBTYPE_NXS_MSG = 0x20; +const uint8_t RS_PKT_SUBTYPE_NXS_TRANS = 0x40; +const uint8_t RS_PKT_SUBTYPE_NXS_GRP_PUBLISH_KEY = 0x80; // possibility create second service to deal with this functionality @@ -53,7 +55,7 @@ const uint8_t RS_PKT_SUBTYPE_EXT_SEARCH_GRP = 0x0001; const uint8_t RS_PKT_SUBTYPE_EXT_SEARCH_MSG = 0x0002; const uint8_t RS_PKT_SUBTYPE_EXT_DELETE_GRP = 0x0004; const uint8_t RS_PKT_SUBTYPE_EXT_DELETE_MSG = 0x0008; -const uint8_t RS_PKT_SUBTYPE_EXT_SEARCH_REQ = 0x0010; +const uint8_t RS_PKT_SUBTYPE_EXT_SEARCH_REQ = 0x0010; /*! @@ -107,6 +109,27 @@ public: }; +/*! + * Use to request statistics about a particular group + */ +class RsNxsSyncGrpStats : public RsNxsItem +{ +public: + + RsNxsSyncGrpStats(uint16_t servtype) : RsNxsItem(servtype, RS_PKT_SUBTYPE_NXS_SYNC_GRP_STATS) {} + + virtual void clear() {} + virtual std::ostream &print(std::ostream &out, uint16_t indent); + + static const uint8_t GROUP_INFO_TYPE_REQUEST = 0x01; + static const uint8_t GROUP_INFO_TYPE_RESPONSE = 0x02; + + uint32_t request_type; // used to determine the type of request + RsGxsGroupId grpId; // id of the group + uint32_t number_of_posts; // number of posts in that group + uint32_t last_post_TS; // time_stamp of last post +}; + /*! * Use to request grp list from peer * Server may advise client peer to use sync file @@ -445,6 +468,10 @@ private: virtual bool serialiseNxsSyncGrp(RsNxsSyncGrp *item, void *data, uint32_t *size); virtual RsNxsSyncGrp* deserialNxsSyncGrp(void *data, uint32_t *size); + virtual uint32_t sizeNxsSyncGrpStats(RsNxsSyncGrpStats* item); + virtual bool serialiseNxsSyncGrpStats(RsNxsSyncGrpStats *item, void *data, uint32_t *size); + virtual RsNxsSyncGrpStats* deserialNxsSyncGrpStats(void *data, uint32_t *size); + /* for RS_PKT_SUBTYPE_SYNC_GRP_ITEM */ virtual uint32_t sizeNxsSyncGrpItem(RsNxsSyncGrpItem* item); From 5fcaa3673661ea67535218a8de49f47d8efb2014 Mon Sep 17 00:00:00 2001 From: csoler Date: Sat, 19 Dec 2015 19:00:06 -0500 Subject: [PATCH 2/3] fixed some serialising bugs, and added proper notification of observer --- libretroshare/src/gxs/rsgxsnetservice.cc | 49 +++++++++++++++------- libretroshare/src/serialiser/rsnxsitems.cc | 11 +++++ 2 files changed, 44 insertions(+), 16 deletions(-) diff --git a/libretroshare/src/gxs/rsgxsnetservice.cc b/libretroshare/src/gxs/rsgxsnetservice.cc index a654ae720..f28ddedc1 100644 --- a/libretroshare/src/gxs/rsgxsnetservice.cc +++ b/libretroshare/src/gxs/rsgxsnetservice.cc @@ -655,8 +655,8 @@ void RsGxsNetService::syncGrpStatistics() mDataStore->retrieveGxsGrpMetaData(grpMeta); - std::set peers; - mNetMgr->getOnlineList(mServiceInfo.mServiceType, peers); + std::set online_peers; + mNetMgr->getOnlineList(mServiceInfo.mServiceType, online_peers); // Go through group statistics and groups without information are re-requested to random peers selected // among the ones who provided the group info. @@ -684,25 +684,28 @@ void RsGxsNetService::syncGrpStatistics() ++rit ; for(uint32_t i=0;ifirst) << " asking friend " << peer_id << " for an update of stats for group " << it->first << std::endl; + GXSNETDEBUG_PG(peer_id,it->first) << " asking friend " << peer_id << " for an update of stats for group " << it->first << std::endl; #endif - RsNxsSyncGrpStats *grs = new RsNxsSyncGrpStats(mServType) ; - - grs->request_type = RsNxsSyncGrpStats::GROUP_INFO_TYPE_REQUEST ; - grs->grpId = it->first ; - grs->PeerId(peer_id) ; + RsNxsSyncGrpStats *grs = new RsNxsSyncGrpStats(mServType) ; - sendItem(grs) ; - } + grs->request_type = RsNxsSyncGrpStats::GROUP_INFO_TYPE_REQUEST ; + grs->grpId = it->first ; + grs->PeerId(peer_id) ; + + sendItem(grs) ; + } + } } #ifdef NXS_NET_DEBUG_0 else @@ -725,6 +728,14 @@ void RsGxsNetService::handleRecvSyncGrpStatistics(RsNxsSyncGrpStats *grs) RsGxsGrpMetaData* grpMeta = grpMetas[grs->grpId]; + if(grpMeta == NULL) + { +#ifdef NXS_NET_DEBUG_0 + GXSNETDEBUG_PG(grs->PeerId(),grs->grpId) << " Group is unknown. Not reponding." << std::endl; +#endif + return ; + } + // check if we're subscribed or not if(! (grpMeta->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED )) @@ -782,9 +793,15 @@ void RsGxsNetService::handleRecvSyncGrpStatistics(RsNxsSyncGrpStats *grs) RS_STACK_MUTEX(mNxsMutex) ; RsGroupNetworkStatsRecord& rec(mGroupNetworkStats[grs->grpId]) ; + int32_t old_count = rec.max_visible_count ; + int32_t old_suppliers_count = rec.suppliers.size() ; + rec.suppliers.insert(grs->PeerId()) ; rec.max_visible_count = std::max(rec.max_visible_count,grs->number_of_posts) ; rec.update_TS = time(NULL) ; + + if (old_count != rec.max_visible_count || old_suppliers_count != rec.suppliers.size()) + mObserver->notifyChangedGroupStats(grs->grpId); } else std::cerr << "(EE) RsGxsNetService::handleRecvSyncGrpStatistics(): unknown item type " << grs->request_type << " found. This is a bug." << std::endl; diff --git a/libretroshare/src/serialiser/rsnxsitems.cc b/libretroshare/src/serialiser/rsnxsitems.cc index 30bc09864..858760b76 100644 --- a/libretroshare/src/serialiser/rsnxsitems.cc +++ b/libretroshare/src/serialiser/rsnxsitems.cc @@ -38,6 +38,7 @@ uint32_t RsNxsSerialiser::size(RsItem *item) { RsNxsGrp* ngp; RsNxsMsg* nmg; RsNxsSyncGrp* sg; + RsNxsSyncGrpStats* sgs; RsNxsSyncGrpItem* sgl; RsNxsSyncMsg* sgm; RsNxsSyncMsgItem* sgml; @@ -52,6 +53,9 @@ uint32_t RsNxsSerialiser::size(RsItem *item) { { return sizeNxsSyncGrp(sg); + } else if((sgs = dynamic_cast(item)) != NULL) + { + return sizeNxsSyncGrpStats(sgs); }else if(( ntx = dynamic_cast(item)) != NULL){ return sizeNxsTrans(ntx); } @@ -109,6 +113,8 @@ RsItem* RsNxsSerialiser::deserialise(void *data, uint32_t *size) { return deserialNxsSyncMsgItem(data, size); case RS_PKT_SUBTYPE_NXS_GRP: return deserialNxsGrp(data, size); + case RS_PKT_SUBTYPE_NXS_SYNC_GRP_STATS: + return deserialNxsSyncGrpStats(data, size); case RS_PKT_SUBTYPE_NXS_MSG: return deserialNxsMsg(data, size); case RS_PKT_SUBTYPE_NXS_TRANS: @@ -134,6 +140,7 @@ bool RsNxsSerialiser::serialise(RsItem *item, void *data, uint32_t *size){ RsNxsGrp* ngp; RsNxsMsg* nmg; RsNxsSyncGrp* sg; + RsNxsSyncGrpStats* sgs; RsNxsSyncGrpItem* sgl; RsNxsSyncMsg* sgm; RsNxsSyncMsgItem* sgml; @@ -144,6 +151,10 @@ bool RsNxsSerialiser::serialise(RsItem *item, void *data, uint32_t *size){ { return serialiseNxsSyncGrp(sg, data, size); + }else if((sgs = dynamic_cast(item)) != NULL) + { + return serialiseNxsSyncGrpStats(sgs, data, size); + }else if ((ntx = dynamic_cast(item)) != NULL) { return serialiseNxsTrans(ntx, data, size); From ccc5f0f8fc28f559c95ae99ccbb72b977b403fd4 Mon Sep 17 00:00:00 2001 From: csoler Date: Sat, 19 Dec 2015 19:04:49 -0500 Subject: [PATCH 3/3] improved debug info in rsgxsnetservice --- libretroshare/src/gxs/rsgxsnetservice.cc | 35 ++++++++++++------------ 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/libretroshare/src/gxs/rsgxsnetservice.cc b/libretroshare/src/gxs/rsgxsnetservice.cc index f28ddedc1..b7c208b0a 100644 --- a/libretroshare/src/gxs/rsgxsnetservice.cc +++ b/libretroshare/src/gxs/rsgxsnetservice.cc @@ -207,12 +207,13 @@ NXS_NET_DEBUG_4 vetting NXS_NET_DEBUG_5 summary of transactions (useful to just know what comes in/out) ***/ -#define NXS_NET_DEBUG_0 1 +//#define NXS_NET_DEBUG_0 1 //#define NXS_NET_DEBUG_1 1 //#define NXS_NET_DEBUG_2 1 //#define NXS_NET_DEBUG_3 1 //#define NXS_NET_DEBUG_4 1 -#define NXS_NET_DEBUG_5 1 +//#define NXS_NET_DEBUG_5 1 +//#define NXS_NET_DEBUG_6 1 #define GIXS_CUT_OFF 0 @@ -234,7 +235,7 @@ // Debug system to allow to print only for some IDs (group, Peer, etc) -#if defined(NXS_NET_DEBUG_0) || defined(NXS_NET_DEBUG_1) || defined(NXS_NET_DEBUG_2) || defined(NXS_NET_DEBUG_3) || defined(NXS_NET_DEBUG_4) || defined(NXS_NET_DEBUG_5) +#if defined(NXS_NET_DEBUG_0) || defined(NXS_NET_DEBUG_1) || defined(NXS_NET_DEBUG_2) || defined(NXS_NET_DEBUG_3) || defined(NXS_NET_DEBUG_4) || defined(NXS_NET_DEBUG_5) || defined(NXS_NET_DEBUG_6) static const RsPeerId peer_to_print = RsPeerId(std::string("")) ; static const RsGxsGroupId group_id_to_print = RsGxsGroupId(std::string("" )) ; // use this to allow to this group id only, or "" for all IDs @@ -243,7 +244,7 @@ static const uint32_t service_to_print = 0 ; // use class nullstream: public std::ostream {}; -#if defined(NXS_NET_DEBUG_0) || defined(NXS_NET_DEBUG_1) || defined(NXS_NET_DEBUG_2) || defined(NXS_NET_DEBUG_3) || defined(NXS_NET_DEBUG_4) || defined(NXS_NET_DEBUG_5) +#if defined(NXS_NET_DEBUG_0) || defined(NXS_NET_DEBUG_1) || defined(NXS_NET_DEBUG_2) || defined(NXS_NET_DEBUG_3) || defined(NXS_NET_DEBUG_4) || defined(NXS_NET_DEBUG_5)|| defined(NXS_NET_DEBUG_6) static std::string nice_time_stamp(time_t now,time_t TS) { if(TS == 0) @@ -647,7 +648,7 @@ void RsGxsNetService::syncGrpStatistics() { RS_STACK_MUTEX(mNxsMutex) ; -#ifdef NXS_NET_DEBUG_0 +#ifdef NXS_NET_DEBUG_6 GXSNETDEBUG___<< "Sync-ing group statistics." << std::endl; #endif typedef std::map GrpMetaMap; @@ -666,13 +667,13 @@ void RsGxsNetService::syncGrpStatistics() for(std::map::iterator it(grpMeta.begin());it!=grpMeta.end();++it) { RsGroupNetworkStatsRecord& rec(mGroupNetworkStats[it->first]) ; -#ifdef NXS_NET_DEBUG_0 +#ifdef NXS_NET_DEBUG_6 GXSNETDEBUG__G(it->first) << " group " << it->first ; #endif if(rec.update_TS + GROUP_STATS_UPDATE_DELAY < now && rec.suppliers.size() > 0) { -#ifdef NXS_NET_DEBUG_0 +#ifdef NXS_NET_DEBUG_6 GXSNETDEBUG__G(it->first) << " needs update. Randomly asking to some friends" << std::endl; #endif // randomly select 2 friends among the suppliers of this group @@ -680,7 +681,7 @@ void RsGxsNetService::syncGrpStatistics() uint32_t n = RSRandom::random_u32() % rec.suppliers.size() ; std::set::const_iterator rit = rec.suppliers.begin(); - for(int i=0;ifirst) << " asking friend " << peer_id << " for an update of stats for group " << it->first << std::endl; #endif @@ -707,7 +708,7 @@ void RsGxsNetService::syncGrpStatistics() } } } -#ifdef NXS_NET_DEBUG_0 +#ifdef NXS_NET_DEBUG_6 else GXSNETDEBUG__G(it->first) << " up to date." << std::endl; #endif @@ -718,7 +719,7 @@ void RsGxsNetService::handleRecvSyncGrpStatistics(RsNxsSyncGrpStats *grs) { if(grs->request_type == RsNxsSyncGrpStats::GROUP_INFO_TYPE_REQUEST) { -#ifdef NXS_NET_DEBUG_0 +#ifdef NXS_NET_DEBUG_6 GXSNETDEBUG_PG(grs->PeerId(),grs->grpId) << "Received Grp update stats Request for group " << grs->grpId << " from friend " << grs->PeerId() << std::endl; #endif std::map grpMetas; @@ -730,7 +731,7 @@ void RsGxsNetService::handleRecvSyncGrpStatistics(RsNxsSyncGrpStats *grs) if(grpMeta == NULL) { -#ifdef NXS_NET_DEBUG_0 +#ifdef NXS_NET_DEBUG_6 GXSNETDEBUG_PG(grs->PeerId(),grs->grpId) << " Group is unknown. Not reponding." << std::endl; #endif return ; @@ -740,7 +741,7 @@ void RsGxsNetService::handleRecvSyncGrpStatistics(RsNxsSyncGrpStats *grs) if(! (grpMeta->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED )) { -#ifdef NXS_NET_DEBUG_0 +#ifdef NXS_NET_DEBUG_6 GXSNETDEBUG_PG(grs->PeerId(),grs->grpId) << " Group is not subscribed. Not reponding." << std::endl; #endif delete grpMeta ; @@ -754,7 +755,7 @@ void RsGxsNetService::handleRecvSyncGrpStatistics(RsNxsSyncGrpStats *grs) reqIds[grs->grpId] = std::vector(); GxsMsgMetaResult result; -#ifdef NXS_NET_DEBUG_0 +#ifdef NXS_NET_DEBUG_6 GXSNETDEBUG_PG(grs->PeerId(),grs->grpId) << " retrieving message information." << std::endl; #endif mDataStore->retrieveGxsMsgMetaData(reqIds, result); @@ -779,7 +780,7 @@ void RsGxsNetService::handleRecvSyncGrpStatistics(RsNxsSyncGrpStats *grs) delete vec[i] ; } -#ifdef NXS_NET_DEBUG_0 +#ifdef NXS_NET_DEBUG_6 GXSNETDEBUG_PG(grs->PeerId(),grs->grpId) << " sending back statistics item with " << vec.size() << " elements." << std::endl; #endif @@ -787,7 +788,7 @@ void RsGxsNetService::handleRecvSyncGrpStatistics(RsNxsSyncGrpStats *grs) } else if(grs->request_type == RsNxsSyncGrpStats::GROUP_INFO_TYPE_RESPONSE) { -#ifdef NXS_NET_DEBUG_0 +#ifdef NXS_NET_DEBUG_6 GXSNETDEBUG_PG(grs->PeerId(),grs->grpId) << "Received Grp update stats item from peer " << grs->PeerId() << " for group " << grs->grpId << ", reporting " << grs->number_of_posts << " posts." << std::endl; #endif RS_STACK_MUTEX(mNxsMutex) ; @@ -1667,8 +1668,6 @@ void RsGxsNetService::updateClientSyncTS() GXSNETDEBUG___<< "updateClientSyncTS(): checking last modification time stamps of local data w.r.t. client's modification times" << std::endl; #endif - time_t now = time(NULL) ; - if(mGrpServerUpdateItem == NULL) mGrpServerUpdateItem = new RsGxsServerGrpUpdateItem(mServType);