mirror of
https://github.com/RetroShare/RetroShare.git
synced 2025-01-03 20:01:07 -05:00
added stats exchange system to gather number of posts in unsubscribed groups without the need to actually DL the messages (reduced bw a lot)
This commit is contained in:
parent
e8b881b2f1
commit
6910ad3695
@ -1552,7 +1552,7 @@ bool RsGxsDataAccess::getGroupStatistic(GroupStatisticRequest *req)
|
|||||||
req->mGroupStatistic.mNumChildMsgsNew = 0;
|
req->mGroupStatistic.mNumChildMsgsNew = 0;
|
||||||
req->mGroupStatistic.mNumChildMsgsUnread = 0;
|
req->mGroupStatistic.mNumChildMsgsUnread = 0;
|
||||||
|
|
||||||
for(int i = 0; i < msgMetaV.size(); ++i)
|
for(uint32_t i = 0; i < msgMetaV.size(); ++i)
|
||||||
{
|
{
|
||||||
RsGxsMsgMetaData* m = msgMetaV[i];
|
RsGxsMsgMetaData* m = msgMetaV[i];
|
||||||
req->mGroupStatistic.mTotalSizeOfMsgs += m->mMsgSize + m->serial_size();
|
req->mGroupStatistic.mTotalSizeOfMsgs += m->mMsgSize + m->serial_size();
|
||||||
|
@ -207,12 +207,12 @@
|
|||||||
NXS_NET_DEBUG_4 vetting
|
NXS_NET_DEBUG_4 vetting
|
||||||
NXS_NET_DEBUG_5 summary of transactions (useful to just know what comes in/out)
|
NXS_NET_DEBUG_5 summary of transactions (useful to just know what comes in/out)
|
||||||
***/
|
***/
|
||||||
//#define NXS_NET_DEBUG_0 1
|
#define NXS_NET_DEBUG_0 1
|
||||||
//#define NXS_NET_DEBUG_1 1
|
//#define NXS_NET_DEBUG_1 1
|
||||||
//#define NXS_NET_DEBUG_2 1
|
//#define NXS_NET_DEBUG_2 1
|
||||||
//#define NXS_NET_DEBUG_3 1
|
//#define NXS_NET_DEBUG_3 1
|
||||||
//#define NXS_NET_DEBUG_4 1
|
//#define NXS_NET_DEBUG_4 1
|
||||||
//#define NXS_NET_DEBUG_5 1
|
#define NXS_NET_DEBUG_5 1
|
||||||
|
|
||||||
#define GIXS_CUT_OFF 0
|
#define GIXS_CUT_OFF 0
|
||||||
|
|
||||||
@ -224,11 +224,13 @@
|
|||||||
// A small value for MAX_REQLIST_SIZE is likely to help messages to propagate in a chaotic network, but will also slow them down.
|
// A small value for MAX_REQLIST_SIZE is likely to help messages to propagate in a chaotic network, but will also slow them down.
|
||||||
// A small SYNC_PERIOD fasten message propagation, but is likely to overload the server side of transactions (e.g. overload outqueues).
|
// A small SYNC_PERIOD fasten message propagation, but is likely to overload the server side of transactions (e.g. overload outqueues).
|
||||||
//
|
//
|
||||||
#define SYNC_PERIOD 60
|
#define SYNC_PERIOD 60
|
||||||
#define MAX_REQLIST_SIZE 20 // No more than 20 items per msg request list => creates smaller transactions that are less likely to be cancelled.
|
#define MAX_REQLIST_SIZE 20 // No more than 20 items per msg request list => creates smaller transactions that are less likely to be cancelled.
|
||||||
#define TRANSAC_TIMEOUT 2000 // In seconds. Has been increased to avoid epidemic transaction cancelling due to overloaded outqueues.
|
#define TRANSAC_TIMEOUT 2000 // In seconds. Has been increased to avoid epidemic transaction cancelling due to overloaded outqueues.
|
||||||
#define SECURITY_DELAY_TO_FORCE_CLIENT_REUPDATE 3600 // force re-update if there happens to be a large delay between our server side TS and the client side TS of friends
|
#define SECURITY_DELAY_TO_FORCE_CLIENT_REUPDATE 3600 // force re-update if there happens to be a large delay between our server side TS and the client side TS of friends
|
||||||
#define REJECTED_MESSAGE_RETRY_DELAY 24*3600 // re-try rejected messages every 24hrs. Most of the time this is because the peer's reputation has changed.
|
#define REJECTED_MESSAGE_RETRY_DELAY 24*3600 // re-try rejected messages every 24hrs. Most of the time this is because the peer's reputation has changed.
|
||||||
|
#define GROUP_STATS_UPDATE_DELAY 1800 // update unsubscribed group statistics every 30 mins
|
||||||
|
#define GROUP_STATS_UPDATE_NB_PEERS 2 // update unsubscribed group statistics every 30 mins
|
||||||
|
|
||||||
// Debug system to allow to print only for some IDs (group, Peer, etc)
|
// Debug system to allow to print only for some IDs (group, Peer, etc)
|
||||||
|
|
||||||
@ -323,6 +325,8 @@ int RsGxsNetService::tick()
|
|||||||
if((elapsed) < now)
|
if((elapsed) < now)
|
||||||
{
|
{
|
||||||
syncWithPeers();
|
syncWithPeers();
|
||||||
|
syncGrpStatistics();
|
||||||
|
|
||||||
mSyncTs = now;
|
mSyncTs = now;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -332,13 +336,12 @@ int RsGxsNetService::tick()
|
|||||||
|
|
||||||
mLastKeyPublishTs = now ;
|
mLastKeyPublishTs = now ;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(now > 3600 + mLastCleanRejectedMessages)
|
if(now > 3600 + mLastCleanRejectedMessages)
|
||||||
{
|
{
|
||||||
sharePublishKeysPending() ;
|
|
||||||
|
|
||||||
mLastCleanRejectedMessages = now ;
|
mLastCleanRejectedMessages = now ;
|
||||||
}
|
|
||||||
cleanRejectedMessages() ;
|
cleanRejectedMessages() ;
|
||||||
|
}
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
@ -543,14 +546,14 @@ void RsGxsNetService::syncWithPeers()
|
|||||||
|
|
||||||
for(; mit != grpMeta.end(); ++mit)
|
for(; mit != grpMeta.end(); ++mit)
|
||||||
{
|
{
|
||||||
RsGxsGrpMetaData* meta = mit->second;
|
RsGxsGrpMetaData* meta = mit->second;
|
||||||
|
|
||||||
// This was commented out because we want to know how many messages are available for unsubscribed groups.
|
// This was commented out because we want to know how many messages are available for unsubscribed groups.
|
||||||
// if(meta->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED )
|
|
||||||
// {
|
if(meta->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED )
|
||||||
toRequest.insert(std::make_pair(mit->first, meta));
|
toRequest.insert(std::make_pair(mit->first, meta));
|
||||||
// }else
|
else
|
||||||
// delete meta;
|
delete meta;
|
||||||
}
|
}
|
||||||
|
|
||||||
grpMeta.clear();
|
grpMeta.clear();
|
||||||
@ -640,6 +643,153 @@ void RsGxsNetService::syncWithPeers()
|
|||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void RsGxsNetService::syncGrpStatistics()
|
||||||
|
{
|
||||||
|
RS_STACK_MUTEX(mNxsMutex) ;
|
||||||
|
|
||||||
|
#ifdef NXS_NET_DEBUG_0
|
||||||
|
GXSNETDEBUG___<< "Sync-ing group statistics." << std::endl;
|
||||||
|
#endif
|
||||||
|
typedef std::map<RsGxsGroupId, RsGxsGrpMetaData* > GrpMetaMap;
|
||||||
|
GrpMetaMap grpMeta;
|
||||||
|
|
||||||
|
mDataStore->retrieveGxsGrpMetaData(grpMeta);
|
||||||
|
|
||||||
|
std::set<RsPeerId> peers;
|
||||||
|
mNetMgr->getOnlineList(mServiceInfo.mServiceType, peers);
|
||||||
|
|
||||||
|
// Go through group statistics and groups without information are re-requested to random peers selected
|
||||||
|
// among the ones who provided the group info.
|
||||||
|
|
||||||
|
time_t now = time(NULL) ;
|
||||||
|
|
||||||
|
for(std::map<RsGxsGroupId,RsGxsGrpMetaData*>::iterator it(grpMeta.begin());it!=grpMeta.end();++it)
|
||||||
|
{
|
||||||
|
RsGroupNetworkStatsRecord& rec(mGroupNetworkStats[it->first]) ;
|
||||||
|
#ifdef NXS_NET_DEBUG_0
|
||||||
|
GXSNETDEBUG__G(it->first) << " group " << it->first ;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
if(rec.update_TS + GROUP_STATS_UPDATE_DELAY < now && rec.suppliers.size() > 0)
|
||||||
|
{
|
||||||
|
#ifdef NXS_NET_DEBUG_0
|
||||||
|
GXSNETDEBUG__G(it->first) << " needs update. Randomly asking to some friends" << std::endl;
|
||||||
|
#endif
|
||||||
|
// randomly select 2 friends among the suppliers of this group
|
||||||
|
|
||||||
|
uint32_t n = RSRandom::random_u32() % rec.suppliers.size() ;
|
||||||
|
|
||||||
|
std::set<RsPeerId>::const_iterator rit = rec.suppliers.begin();
|
||||||
|
for(int i=0;i<n;++i)
|
||||||
|
++rit ;
|
||||||
|
|
||||||
|
for(uint32_t i=0;i<std::min(rec.suppliers.size(),(size_t)GROUP_STATS_UPDATE_NB_PEERS);++i)
|
||||||
|
{
|
||||||
|
RsPeerId peer_id = *rit ;
|
||||||
|
|
||||||
|
++rit ;
|
||||||
|
if(rit == rec.suppliers.end())
|
||||||
|
rit = rec.suppliers.begin() ;
|
||||||
|
|
||||||
|
#ifdef NXS_NET_DEBUG_0
|
||||||
|
GXSNETDEBUG_PG(peer_id,it->first) << " asking friend " << peer_id << " for an update of stats for group " << it->first << std::endl;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
RsNxsSyncGrpStats *grs = new RsNxsSyncGrpStats(mServType) ;
|
||||||
|
|
||||||
|
grs->request_type = RsNxsSyncGrpStats::GROUP_INFO_TYPE_REQUEST ;
|
||||||
|
grs->grpId = it->first ;
|
||||||
|
grs->PeerId(peer_id) ;
|
||||||
|
|
||||||
|
sendItem(grs) ;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#ifdef NXS_NET_DEBUG_0
|
||||||
|
else
|
||||||
|
GXSNETDEBUG__G(it->first) << " up to date." << std::endl;
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void RsGxsNetService::handleRecvSyncGrpStatistics(RsNxsSyncGrpStats *grs)
|
||||||
|
{
|
||||||
|
if(grs->request_type == RsNxsSyncGrpStats::GROUP_INFO_TYPE_REQUEST)
|
||||||
|
{
|
||||||
|
#ifdef NXS_NET_DEBUG_0
|
||||||
|
GXSNETDEBUG_PG(grs->PeerId(),grs->grpId) << "Received Grp update stats Request for group " << grs->grpId << " from friend " << grs->PeerId() << std::endl;
|
||||||
|
#endif
|
||||||
|
std::map<RsGxsGroupId, RsGxsGrpMetaData*> grpMetas;
|
||||||
|
grpMetas[grs->grpId] = NULL;
|
||||||
|
|
||||||
|
mDataStore->retrieveGxsGrpMetaData(grpMetas);
|
||||||
|
|
||||||
|
RsGxsGrpMetaData* grpMeta = grpMetas[grs->grpId];
|
||||||
|
|
||||||
|
// check if we're subscribed or not
|
||||||
|
|
||||||
|
if(! (grpMeta->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED ))
|
||||||
|
{
|
||||||
|
#ifdef NXS_NET_DEBUG_0
|
||||||
|
GXSNETDEBUG_PG(grs->PeerId(),grs->grpId) << " Group is not subscribed. Not reponding." << std::endl;
|
||||||
|
#endif
|
||||||
|
delete grpMeta ;
|
||||||
|
return ;
|
||||||
|
}
|
||||||
|
delete grpMeta ;
|
||||||
|
|
||||||
|
// now count available messages
|
||||||
|
|
||||||
|
GxsMsgReq reqIds;
|
||||||
|
reqIds[grs->grpId] = std::vector<RsGxsMessageId>();
|
||||||
|
GxsMsgMetaResult result;
|
||||||
|
|
||||||
|
#ifdef NXS_NET_DEBUG_0
|
||||||
|
GXSNETDEBUG_PG(grs->PeerId(),grs->grpId) << " retrieving message information." << std::endl;
|
||||||
|
#endif
|
||||||
|
mDataStore->retrieveGxsMsgMetaData(reqIds, result);
|
||||||
|
|
||||||
|
const std::vector<RsGxsMsgMetaData*>& vec(result[grs->grpId]) ;
|
||||||
|
|
||||||
|
if(vec.empty()) // that means we don't have any, or there isn't any, but since the default is always 0, no need to send.
|
||||||
|
return ;
|
||||||
|
|
||||||
|
RsNxsSyncGrpStats *grs_resp = new RsNxsSyncGrpStats(mServType) ;
|
||||||
|
grs_resp->request_type = RsNxsSyncGrpStats::GROUP_INFO_TYPE_RESPONSE ;
|
||||||
|
grs_resp->number_of_posts = vec.size();
|
||||||
|
grs_resp->grpId = grs->grpId;
|
||||||
|
grs_resp->PeerId(grs->PeerId()) ;
|
||||||
|
|
||||||
|
grs_resp->last_post_TS = 0 ;
|
||||||
|
|
||||||
|
for(uint32_t i=0;i<vec.size();++i)
|
||||||
|
{
|
||||||
|
if(grs_resp->last_post_TS < vec[i]->mPublishTs)
|
||||||
|
grs_resp->last_post_TS = vec[i]->mPublishTs;
|
||||||
|
|
||||||
|
delete vec[i] ;
|
||||||
|
}
|
||||||
|
#ifdef NXS_NET_DEBUG_0
|
||||||
|
GXSNETDEBUG_PG(grs->PeerId(),grs->grpId) << " sending back statistics item with " << vec.size() << " elements." << std::endl;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
sendItem(grs_resp) ;
|
||||||
|
}
|
||||||
|
else if(grs->request_type == RsNxsSyncGrpStats::GROUP_INFO_TYPE_RESPONSE)
|
||||||
|
{
|
||||||
|
#ifdef NXS_NET_DEBUG_0
|
||||||
|
GXSNETDEBUG_PG(grs->PeerId(),grs->grpId) << "Received Grp update stats item from peer " << grs->PeerId() << " for group " << grs->grpId << ", reporting " << grs->number_of_posts << " posts." << std::endl;
|
||||||
|
#endif
|
||||||
|
RS_STACK_MUTEX(mNxsMutex) ;
|
||||||
|
RsGroupNetworkStatsRecord& rec(mGroupNetworkStats[grs->grpId]) ;
|
||||||
|
|
||||||
|
rec.suppliers.insert(grs->PeerId()) ;
|
||||||
|
rec.max_visible_count = std::max(rec.max_visible_count,grs->number_of_posts) ;
|
||||||
|
rec.update_TS = time(NULL) ;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
std::cerr << "(EE) RsGxsNetService::handleRecvSyncGrpStatistics(): unknown item type " << grs->request_type << " found. This is a bug." << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
void RsGxsNetService::subscribeStatusChanged(const RsGxsGroupId& grpId,bool subscribed)
|
void RsGxsNetService::subscribeStatusChanged(const RsGxsGroupId& grpId,bool subscribed)
|
||||||
{
|
{
|
||||||
RS_STACK_MUTEX(mNxsMutex) ;
|
RS_STACK_MUTEX(mNxsMutex) ;
|
||||||
@ -1118,27 +1268,29 @@ private:
|
|||||||
|
|
||||||
bool RsGxsNetService::loadList(std::list<RsItem *> &load)
|
bool RsGxsNetService::loadList(std::list<RsItem *> &load)
|
||||||
{
|
{
|
||||||
RS_STACK_MUTEX(mNxsMutex) ;
|
RS_STACK_MUTEX(mNxsMutex) ;
|
||||||
|
|
||||||
std::for_each(load.begin(), load.end(), StoreHere(mClientGrpUpdateMap, mClientMsgUpdateMap, mServerMsgUpdateMap, mGrpServerUpdateItem));
|
std::for_each(load.begin(), load.end(), StoreHere(mClientGrpUpdateMap, mClientMsgUpdateMap, mServerMsgUpdateMap, mGrpServerUpdateItem));
|
||||||
|
time_t now = time(NULL);
|
||||||
|
|
||||||
for(ClientMsgMap::iterator it = mClientMsgUpdateMap.begin();it!=mClientMsgUpdateMap.end();++it)
|
for(ClientMsgMap::iterator it = mClientMsgUpdateMap.begin();it!=mClientMsgUpdateMap.end();++it)
|
||||||
for(std::map<RsGxsGroupId,RsGxsMsgUpdateItem::MsgUpdateInfo>::const_iterator it2(it->second->msgUpdateInfos.begin());it2!=it->second->msgUpdateInfos.end();++it2)
|
for(std::map<RsGxsGroupId,RsGxsMsgUpdateItem::MsgUpdateInfo>::const_iterator it2(it->second->msgUpdateInfos.begin());it2!=it->second->msgUpdateInfos.end();++it2)
|
||||||
{
|
{
|
||||||
RsGroupNetworkStatsRecord& gnsr = mGroupNetworkStats[it2->first] ;
|
RsGroupNetworkStatsRecord& gnsr = mGroupNetworkStats[it2->first] ;
|
||||||
|
|
||||||
// At each reload, divide the last count by 2. This gradually flushes old information away.
|
// At each reload, divide the last count by 2. This gradually flushes old information away.
|
||||||
|
|
||||||
gnsr.max_visible_count = std::max(it2->second.message_count,gnsr.max_visible_count/2) ;
|
gnsr.max_visible_count = std::max(it2->second.message_count,gnsr.max_visible_count/2) ;
|
||||||
|
gnsr.update_TS = now - GROUP_STATS_UPDATE_DELAY + (RSRandom::random_u32()%(GROUP_STATS_UPDATE_DELAY/10)) ;
|
||||||
|
|
||||||
// Similarly, we remove some of the suppliers randomly. If they are
|
// Similarly, we remove some of the suppliers randomly. If they are
|
||||||
// actual suppliers, they will come back automatically. If they are
|
// actual suppliers, they will come back automatically. If they are
|
||||||
// not, they will be forgotten.
|
// not, they will be forgotten.
|
||||||
|
|
||||||
if(RSRandom::random_f32() > 0.2)
|
if(RSRandom::random_f32() > 0.2)
|
||||||
gnsr.suppliers.insert(it->first) ;
|
gnsr.suppliers.insert(it->first) ;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
@ -1206,9 +1358,10 @@ void RsGxsNetService::recvNxsItemQueue()
|
|||||||
|
|
||||||
switch(ni->PacketSubType())
|
switch(ni->PacketSubType())
|
||||||
{
|
{
|
||||||
case RS_PKT_SUBTYPE_NXS_SYNC_GRP: handleRecvSyncGroup (dynamic_cast<RsNxsSyncGrp*>(ni)) ; break ;
|
case RS_PKT_SUBTYPE_NXS_SYNC_GRP_STATS: handleRecvSyncGrpStatistics (dynamic_cast<RsNxsSyncGrpStats*>(ni)) ; break ;
|
||||||
case RS_PKT_SUBTYPE_NXS_SYNC_MSG: handleRecvSyncMessage (dynamic_cast<RsNxsSyncMsg*>(ni)) ; break ;
|
case RS_PKT_SUBTYPE_NXS_SYNC_GRP: handleRecvSyncGroup (dynamic_cast<RsNxsSyncGrp*>(ni)) ; break ;
|
||||||
case RS_PKT_SUBTYPE_NXS_GRP_PUBLISH_KEY: handleRecvPublishKeys (dynamic_cast<RsNxsGroupPublishKeyItem*>(ni)) ; break ;
|
case RS_PKT_SUBTYPE_NXS_SYNC_MSG: handleRecvSyncMessage (dynamic_cast<RsNxsSyncMsg*>(ni)) ; break ;
|
||||||
|
case RS_PKT_SUBTYPE_NXS_GRP_PUBLISH_KEY: handleRecvPublishKeys (dynamic_cast<RsNxsGroupPublishKeyItem*>(ni)) ; break ;
|
||||||
default:
|
default:
|
||||||
std::cerr << "Unhandled item subtype " << (uint32_t) ni->PacketSubType() << " in RsGxsNetService: " << std::endl; break;
|
std::cerr << "Unhandled item subtype " << (uint32_t) ni->PacketSubType() << " in RsGxsNetService: " << std::endl; break;
|
||||||
}
|
}
|
||||||
@ -2303,6 +2456,8 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr)
|
|||||||
// peer again, unless the peer has new info about it. It's important to use the same clock (this is peer's clock) so that
|
// peer again, unless the peer has new info about it. It's important to use the same clock (this is peer's clock) so that
|
||||||
// we never compare times from different (and potentially badly sync-ed clocks)
|
// we never compare times from different (and potentially badly sync-ed clocks)
|
||||||
|
|
||||||
|
std::cerr << "(EE) stepping in part of the code (" << __PRETTY_FUNCTION__ << ") where we shouldn't. This is a bug." << std::endl;
|
||||||
|
|
||||||
locked_stampPeerGroupUpdateTime(pid,grpId,tr->mTransaction->updateTS,msgItemL.size()) ;
|
locked_stampPeerGroupUpdateTime(pid,grpId,tr->mTransaction->updateTS,msgItemL.size()) ;
|
||||||
|
|
||||||
if(grpMeta)
|
if(grpMeta)
|
||||||
@ -3079,17 +3234,17 @@ void RsGxsNetService::handleRecvSyncGroup(RsNxsSyncGrp* item)
|
|||||||
|
|
||||||
RS_STACK_MUTEX(mNxsMutex) ;
|
RS_STACK_MUTEX(mNxsMutex) ;
|
||||||
|
|
||||||
RsPeerId peer = item->PeerId();
|
RsPeerId peer = item->PeerId();
|
||||||
#ifdef NXS_NET_DEBUG_0
|
#ifdef NXS_NET_DEBUG_0
|
||||||
GXSNETDEBUG_P_(peer) << "HandleRecvSyncGroup(): Service: " << mServType << " from " << peer << ", Last update TS (from myself) sent from peer is T = " << std::dec<< time(NULL) - item->updateTS << " secs ago" << std::endl;
|
GXSNETDEBUG_P_(peer) << "HandleRecvSyncGroup(): Service: " << mServType << " from " << peer << ", Last update TS (from myself) sent from peer is T = " << std::dec<< time(NULL) - item->updateTS << " secs ago" << std::endl;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
if(!locked_CanReceiveUpdate(item))
|
if(!locked_CanReceiveUpdate(item))
|
||||||
{
|
{
|
||||||
#ifdef NXS_NET_DEBUG_0
|
#ifdef NXS_NET_DEBUG_0
|
||||||
GXSNETDEBUG_P_(peer) << " RsGxsNetService::handleRecvSyncGroup() update will not be sent." << std::endl;
|
GXSNETDEBUG_P_(peer) << " RsGxsNetService::handleRecvSyncGroup() update will not be sent." << std::endl;
|
||||||
#endif
|
#endif
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::map<RsGxsGroupId, RsGxsGrpMetaData*> grp;
|
std::map<RsGxsGroupId, RsGxsGrpMetaData*> grp;
|
||||||
@ -3121,6 +3276,8 @@ void RsGxsNetService::handleRecvSyncGroup(RsNxsSyncGrp* item)
|
|||||||
{
|
{
|
||||||
RsGxsGrpMetaData* grpMeta = mit->second;
|
RsGxsGrpMetaData* grpMeta = mit->second;
|
||||||
|
|
||||||
|
// Only send info about subscribed groups.
|
||||||
|
|
||||||
if(grpMeta->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED)
|
if(grpMeta->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED)
|
||||||
{
|
{
|
||||||
|
|
||||||
@ -3152,9 +3309,9 @@ void RsGxsNetService::handleRecvSyncGroup(RsNxsSyncGrp* item)
|
|||||||
}
|
}
|
||||||
|
|
||||||
#ifdef NXS_NET_DEBUG_0
|
#ifdef NXS_NET_DEBUG_0
|
||||||
GXSNETDEBUG_P_(peer) << " final list sent (after vetting): " << itemL.size() << " elements." << std::endl;
|
GXSNETDEBUG_P_(peer) << " final list sent (after vetting): " << itemL.size() << " elements." << std::endl;
|
||||||
#endif
|
#endif
|
||||||
locked_pushGrpRespFromList(itemL, peer, transN);
|
locked_pushGrpRespFromList(itemL, peer, transN);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -3437,7 +3594,7 @@ void RsGxsNetService::handleRecvSyncMessage(RsNxsSyncMsg* item)
|
|||||||
#endif
|
#endif
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if(!(grpMeta->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED ))
|
if(!(grpMeta->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED ))
|
||||||
{
|
{
|
||||||
#ifdef NXS_NET_DEBUG_0
|
#ifdef NXS_NET_DEBUG_0
|
||||||
GXSNETDEBUG_PG(item->PeerId(),item->grpId) << " Grp is not subscribed." << std::endl;
|
GXSNETDEBUG_PG(item->PeerId(),item->grpId) << " Grp is not subscribed." << std::endl;
|
||||||
|
@ -54,7 +54,8 @@ class RsGroupNetworkStatsRecord
|
|||||||
RsGroupNetworkStatsRecord() { max_visible_count= 0 ; }
|
RsGroupNetworkStatsRecord() { max_visible_count= 0 ; }
|
||||||
|
|
||||||
std::set<RsPeerId> suppliers ;
|
std::set<RsPeerId> suppliers ;
|
||||||
uint32_t max_visible_count ;
|
uint32_t max_visible_count ;
|
||||||
|
time_t update_TS ;
|
||||||
};
|
};
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
@ -319,6 +320,12 @@ private:
|
|||||||
*/
|
*/
|
||||||
void handleRecvSyncGroup(RsNxsSyncGrp* item);
|
void handleRecvSyncGroup(RsNxsSyncGrp* item);
|
||||||
|
|
||||||
|
/*!
|
||||||
|
* Handles an nxs item for group statistics
|
||||||
|
* @param item contaims update time stamp and number of messages
|
||||||
|
*/
|
||||||
|
void handleRecvSyncGrpStatistics(RsNxsSyncGrpStats *grs);
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* Handles an nxs item for msgs synchronisation
|
* Handles an nxs item for msgs synchronisation
|
||||||
* @param item contaims msg sync info
|
* @param item contaims msg sync info
|
||||||
@ -359,6 +366,7 @@ private:
|
|||||||
void locked_pushGrpRespFromList(std::list<RsNxsItem*>& respList, const RsPeerId& peer, const uint32_t& transN);
|
void locked_pushGrpRespFromList(std::list<RsNxsItem*>& respList, const RsPeerId& peer, const uint32_t& transN);
|
||||||
void locked_pushMsgRespFromList(std::list<RsNxsItem*>& itemL, const RsPeerId& sslId, const uint32_t& transN);
|
void locked_pushMsgRespFromList(std::list<RsNxsItem*>& itemL, const RsPeerId& sslId, const uint32_t& transN);
|
||||||
void syncWithPeers();
|
void syncWithPeers();
|
||||||
|
void syncGrpStatistics();
|
||||||
void addGroupItemToList(NxsTransaction*& tr,
|
void addGroupItemToList(NxsTransaction*& tr,
|
||||||
const RsGxsGroupId& grpId, uint32_t& transN,
|
const RsGxsGroupId& grpId, uint32_t& transN,
|
||||||
std::list<RsNxsItem*>& reqList);
|
std::list<RsNxsItem*>& reqList);
|
||||||
|
@ -330,6 +330,54 @@ bool RsNxsSerialiser::serialiseNxsGrp(RsNxsGrp *item, void *data, uint32_t *size
|
|||||||
return ok;
|
return ok;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
bool RsNxsSerialiser::serialiseNxsSyncGrpStats(RsNxsSyncGrpStats *item, void *data, uint32_t *size)
|
||||||
|
{
|
||||||
|
|
||||||
|
#ifdef RSSERIAL_DEBUG
|
||||||
|
std::cerr << "RsNxsSerialiser::serialiseNxsSyncGrpStats()" << std::endl;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
uint32_t tlvsize = sizeNxsSyncGrpStats(item);
|
||||||
|
uint32_t offset = 0;
|
||||||
|
|
||||||
|
if(*size < tlvsize){
|
||||||
|
#ifdef RSSERIAL_DEBUG
|
||||||
|
std::cerr << "RsNxsSerialiser::serialiseNxsSyncGrpStats()" << std::endl;
|
||||||
|
#endif
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
*size = tlvsize;
|
||||||
|
|
||||||
|
bool ok = true;
|
||||||
|
|
||||||
|
ok &= setRsItemHeader(data, tlvsize, item->PacketId(), tlvsize);
|
||||||
|
|
||||||
|
/* skip the header */
|
||||||
|
offset += 8;
|
||||||
|
|
||||||
|
ok &= setRawUInt32(data, *size, &offset, item->request_type);
|
||||||
|
ok &= item->grpId.serialise(data, *size, offset) ;
|
||||||
|
ok &= setRawUInt32(data, *size, &offset, item->number_of_posts);
|
||||||
|
ok &= setRawUInt32(data, *size, &offset, item->last_post_TS);
|
||||||
|
|
||||||
|
if(offset != tlvsize){
|
||||||
|
#ifdef RSSERIAL_DEBUG
|
||||||
|
std::cerr << "RsNxsSerialiser::serialiseSyncGrpStats() FAIL Size Error! " << std::endl;
|
||||||
|
#endif
|
||||||
|
ok = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
#ifdef RSSERIAL_DEBUG
|
||||||
|
if (!ok)
|
||||||
|
{
|
||||||
|
std::cerr << "RsNxsSerialiser::serialiseSyncGrpStats() NOK" << std::endl;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
return ok;
|
||||||
|
}
|
||||||
bool RsNxsSerialiser::serialiseNxsSyncGrp(RsNxsSyncGrp *item, void *data, uint32_t *size)
|
bool RsNxsSerialiser::serialiseNxsSyncGrp(RsNxsSyncGrp *item, void *data, uint32_t *size)
|
||||||
{
|
{
|
||||||
|
|
||||||
@ -707,6 +755,71 @@ RsNxsMsg* RsNxsSerialiser::deserialNxsMsg(void *data, uint32_t *size){
|
|||||||
return item;
|
return item;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
RsNxsSyncGrpStats* RsNxsSerialiser::deserialNxsSyncGrpStats(void *data, uint32_t *size){
|
||||||
|
|
||||||
|
#ifdef RSSERIAL_DEBUG
|
||||||
|
std::cerr << "RsNxsSerialiser::deserialNxsSyncGrpStats()" << std::endl;
|
||||||
|
#endif
|
||||||
|
/* get the type and size */
|
||||||
|
uint32_t rstype = getRsItemId(data);
|
||||||
|
uint32_t rssize = getRsItemSize(data);
|
||||||
|
|
||||||
|
uint32_t offset = 0;
|
||||||
|
|
||||||
|
|
||||||
|
if ((RS_PKT_VERSION_SERVICE != getRsItemVersion(rstype)) || (SERVICE_TYPE != getRsItemService(rstype)) || (RS_PKT_SUBTYPE_NXS_SYNC_GRP_STATS != getRsItemSubType(rstype)))
|
||||||
|
{
|
||||||
|
#ifdef RSSERIAL_DEBUG
|
||||||
|
std::cerr << "RsNxsSerialiser::deserialNxsSyncGrpStats() FAIL wrong type" << std::endl;
|
||||||
|
#endif
|
||||||
|
return NULL; /* wrong type */
|
||||||
|
}
|
||||||
|
|
||||||
|
if (*size < rssize) /* check size */
|
||||||
|
{
|
||||||
|
#ifdef RSSERIAL_DEBUG
|
||||||
|
std::cerr << "RsNxsSerialiser::deserialNxsSyncGrpStats() FAIL wrong size" << std::endl;
|
||||||
|
#endif
|
||||||
|
return NULL; /* not enough data */
|
||||||
|
}
|
||||||
|
|
||||||
|
/* set the packet length */
|
||||||
|
*size = rssize;
|
||||||
|
|
||||||
|
bool ok = true;
|
||||||
|
|
||||||
|
RsNxsSyncGrpStats* item = new RsNxsSyncGrpStats(getRsItemService(rstype));
|
||||||
|
/* skip the header */
|
||||||
|
offset += 8;
|
||||||
|
|
||||||
|
ok &= getRawUInt32(data, *size, &offset, &(item->request_type));
|
||||||
|
ok &= item->grpId.deserialise(data, *size, offset) ;
|
||||||
|
ok &= getRawUInt32(data, *size, &offset, &(item->number_of_posts));
|
||||||
|
ok &= getRawUInt32(data, *size, &offset, &(item->last_post_TS));
|
||||||
|
|
||||||
|
if (offset != rssize)
|
||||||
|
{
|
||||||
|
#ifdef RSSERIAL_DEBUG
|
||||||
|
std::cerr << "RsNxsSerialiser::deserialNxsSyncGrpStats() FAIL size mismatch" << std::endl;
|
||||||
|
#endif
|
||||||
|
/* error */
|
||||||
|
delete item;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!ok)
|
||||||
|
{
|
||||||
|
#ifdef RSSERIAL_DEBUG
|
||||||
|
std::cerr << "RsNxsSerialiser::deserialNxsSyncGrpStats() NOK" << std::endl;
|
||||||
|
#endif
|
||||||
|
delete item;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
return item;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
RsNxsSyncGrp* RsNxsSerialiser::deserialNxsSyncGrp(void *data, uint32_t *size){
|
RsNxsSyncGrp* RsNxsSerialiser::deserialNxsSyncGrp(void *data, uint32_t *size){
|
||||||
|
|
||||||
@ -1166,7 +1279,17 @@ uint32_t RsNxsSerialiser::sizeNxsSyncGrp(RsNxsSyncGrp *item)
|
|||||||
|
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
uint32_t RsNxsSerialiser::sizeNxsSyncGrpStats(RsNxsSyncGrpStats *item)
|
||||||
|
{
|
||||||
|
uint32_t s = 8; // header size
|
||||||
|
|
||||||
|
s += 4; // request type
|
||||||
|
s += item->grpId.serial_size();
|
||||||
|
s += 4; // number_of_posts
|
||||||
|
s += 4; // last_post_TS
|
||||||
|
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
uint32_t RsNxsSerialiser::sizeNxsSyncGrpItem(RsNxsSyncGrpItem *item)
|
uint32_t RsNxsSerialiser::sizeNxsSyncGrpItem(RsNxsSyncGrpItem *item)
|
||||||
{
|
{
|
||||||
@ -1260,7 +1383,6 @@ void RsNxsSyncMsg::clear()
|
|||||||
syncHash.clear();
|
syncHash.clear();
|
||||||
updateTS = 0;
|
updateTS = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void RsNxsSyncGrpItem::clear()
|
void RsNxsSyncGrpItem::clear()
|
||||||
{
|
{
|
||||||
flag = 0;
|
flag = 0;
|
||||||
@ -1430,7 +1552,23 @@ std::ostream& RsNxsMsg::print(std::ostream &out, uint16_t indent){
|
|||||||
return out;
|
return out;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::ostream& RsNxsSyncGrpStats::print(std::ostream &out, uint16_t indent){
|
||||||
|
|
||||||
|
printRsItemBase(out, "RsNxsSyncGrpStats", indent);
|
||||||
|
uint16_t int_Indent = indent + 2;
|
||||||
|
|
||||||
|
out << "available posts: " << number_of_posts << std::endl;
|
||||||
|
printIndent(out , int_Indent);
|
||||||
|
out << "last update: " << last_post_TS << std::endl;
|
||||||
|
printIndent(out , int_Indent);
|
||||||
|
out << "group ID: " << grpId << std::endl;
|
||||||
|
printIndent(out , int_Indent);
|
||||||
|
out << "request type: " << request_type << std::endl;
|
||||||
|
printIndent(out , int_Indent);
|
||||||
|
|
||||||
|
printRsItemEnd(out ,"RsNxsSyncGrpStats", indent);
|
||||||
|
return out;
|
||||||
|
}
|
||||||
std::ostream& RsNxsTransac::print(std::ostream &out, uint16_t indent){
|
std::ostream& RsNxsTransac::print(std::ostream &out, uint16_t indent){
|
||||||
|
|
||||||
printRsItemBase(out, "RsNxsTransac", indent);
|
printRsItemBase(out, "RsNxsTransac", indent);
|
||||||
|
@ -36,15 +36,17 @@
|
|||||||
#include "serialiser/rstlvkeys.h"
|
#include "serialiser/rstlvkeys.h"
|
||||||
#include "gxs/rsgxsdata.h"
|
#include "gxs/rsgxsdata.h"
|
||||||
|
|
||||||
|
// These items have "flag type" numbers, but this is not used.
|
||||||
|
|
||||||
const uint8_t RS_PKT_SUBTYPE_NXS_SYNC_GRP = 0x0001;
|
const uint8_t RS_PKT_SUBTYPE_NXS_SYNC_GRP = 0x01;
|
||||||
const uint8_t RS_PKT_SUBTYPE_NXS_SYNC_GRP_ITEM = 0x0002;
|
const uint8_t RS_PKT_SUBTYPE_NXS_SYNC_GRP_ITEM = 0x02;
|
||||||
const uint8_t RS_PKT_SUBTYPE_NXS_GRP = 0x0004;
|
const uint8_t RS_PKT_SUBTYPE_NXS_SYNC_GRP_STATS = 0x03;
|
||||||
const uint8_t RS_PKT_SUBTYPE_NXS_SYNC_MSG_ITEM = 0x0008;
|
const uint8_t RS_PKT_SUBTYPE_NXS_GRP = 0x04;
|
||||||
const uint8_t RS_PKT_SUBTYPE_NXS_SYNC_MSG = 0x0010;
|
const uint8_t RS_PKT_SUBTYPE_NXS_SYNC_MSG_ITEM = 0x08;
|
||||||
const uint8_t RS_PKT_SUBTYPE_NXS_MSG = 0x0020;
|
const uint8_t RS_PKT_SUBTYPE_NXS_SYNC_MSG = 0x10;
|
||||||
const uint8_t RS_PKT_SUBTYPE_NXS_TRANS = 0x0040;
|
const uint8_t RS_PKT_SUBTYPE_NXS_MSG = 0x20;
|
||||||
const uint8_t RS_PKT_SUBTYPE_NXS_GRP_PUBLISH_KEY = 0x0080;
|
const uint8_t RS_PKT_SUBTYPE_NXS_TRANS = 0x40;
|
||||||
|
const uint8_t RS_PKT_SUBTYPE_NXS_GRP_PUBLISH_KEY = 0x80;
|
||||||
|
|
||||||
|
|
||||||
// possibility create second service to deal with this functionality
|
// possibility create second service to deal with this functionality
|
||||||
@ -53,7 +55,7 @@ const uint8_t RS_PKT_SUBTYPE_EXT_SEARCH_GRP = 0x0001;
|
|||||||
const uint8_t RS_PKT_SUBTYPE_EXT_SEARCH_MSG = 0x0002;
|
const uint8_t RS_PKT_SUBTYPE_EXT_SEARCH_MSG = 0x0002;
|
||||||
const uint8_t RS_PKT_SUBTYPE_EXT_DELETE_GRP = 0x0004;
|
const uint8_t RS_PKT_SUBTYPE_EXT_DELETE_GRP = 0x0004;
|
||||||
const uint8_t RS_PKT_SUBTYPE_EXT_DELETE_MSG = 0x0008;
|
const uint8_t RS_PKT_SUBTYPE_EXT_DELETE_MSG = 0x0008;
|
||||||
const uint8_t RS_PKT_SUBTYPE_EXT_SEARCH_REQ = 0x0010;
|
const uint8_t RS_PKT_SUBTYPE_EXT_SEARCH_REQ = 0x0010;
|
||||||
|
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
@ -107,6 +109,27 @@ public:
|
|||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/*!
|
||||||
|
* Use to request statistics about a particular group
|
||||||
|
*/
|
||||||
|
class RsNxsSyncGrpStats : public RsNxsItem
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
|
||||||
|
RsNxsSyncGrpStats(uint16_t servtype) : RsNxsItem(servtype, RS_PKT_SUBTYPE_NXS_SYNC_GRP_STATS) {}
|
||||||
|
|
||||||
|
virtual void clear() {}
|
||||||
|
virtual std::ostream &print(std::ostream &out, uint16_t indent);
|
||||||
|
|
||||||
|
static const uint8_t GROUP_INFO_TYPE_REQUEST = 0x01;
|
||||||
|
static const uint8_t GROUP_INFO_TYPE_RESPONSE = 0x02;
|
||||||
|
|
||||||
|
uint32_t request_type; // used to determine the type of request
|
||||||
|
RsGxsGroupId grpId; // id of the group
|
||||||
|
uint32_t number_of_posts; // number of posts in that group
|
||||||
|
uint32_t last_post_TS; // time_stamp of last post
|
||||||
|
};
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* Use to request grp list from peer
|
* Use to request grp list from peer
|
||||||
* Server may advise client peer to use sync file
|
* Server may advise client peer to use sync file
|
||||||
@ -445,6 +468,10 @@ private:
|
|||||||
virtual bool serialiseNxsSyncGrp(RsNxsSyncGrp *item, void *data, uint32_t *size);
|
virtual bool serialiseNxsSyncGrp(RsNxsSyncGrp *item, void *data, uint32_t *size);
|
||||||
virtual RsNxsSyncGrp* deserialNxsSyncGrp(void *data, uint32_t *size);
|
virtual RsNxsSyncGrp* deserialNxsSyncGrp(void *data, uint32_t *size);
|
||||||
|
|
||||||
|
virtual uint32_t sizeNxsSyncGrpStats(RsNxsSyncGrpStats* item);
|
||||||
|
virtual bool serialiseNxsSyncGrpStats(RsNxsSyncGrpStats *item, void *data, uint32_t *size);
|
||||||
|
virtual RsNxsSyncGrpStats* deserialNxsSyncGrpStats(void *data, uint32_t *size);
|
||||||
|
|
||||||
/* for RS_PKT_SUBTYPE_SYNC_GRP_ITEM */
|
/* for RS_PKT_SUBTYPE_SYNC_GRP_ITEM */
|
||||||
|
|
||||||
virtual uint32_t sizeNxsSyncGrpItem(RsNxsSyncGrpItem* item);
|
virtual uint32_t sizeNxsSyncGrpItem(RsNxsSyncGrpItem* item);
|
||||||
|
Loading…
Reference in New Issue
Block a user