added automatic detection for need to dist-sync groups

This commit is contained in:
csoler 2018-04-01 22:04:16 +02:00
parent f0f69b8dd9
commit aa59694d88
No known key found for this signature in database
GPG Key ID: 7BCA522266C0804C
5 changed files with 148 additions and 63 deletions

View File

@ -269,6 +269,7 @@
NXS_NET_DEBUG_5 summary of transactions (useful to just know what comes in/out)
NXS_NET_DEBUG_6 group sync statistics (e.g. number of posts at nighbour nodes, etc)
NXS_NET_DEBUG_7 encryption/decryption of transactions
NXS_NET_DEBUG_8 gxs distant sync
***/
//#define NXS_NET_DEBUG_0 1
@ -279,6 +280,7 @@
//#define NXS_NET_DEBUG_5 1
//#define NXS_NET_DEBUG_6 1
//#define NXS_NET_DEBUG_7 1
#define NXS_NET_DEBUG_8 1
//#define NXS_FRAG
@ -312,11 +314,12 @@ static const uint32_t RS_NXS_ITEM_ENCRYPTION_STATUS_GXS_KEY_MISSING = 0x05 ;
// Debug system to allow to print only for some IDs (group, Peer, etc)
#if defined(NXS_NET_DEBUG_0) || defined(NXS_NET_DEBUG_1) || defined(NXS_NET_DEBUG_2) || defined(NXS_NET_DEBUG_3) \
|| defined(NXS_NET_DEBUG_4) || defined(NXS_NET_DEBUG_5) || defined(NXS_NET_DEBUG_6) || defined(NXS_NET_DEBUG_7)
|| defined(NXS_NET_DEBUG_4) || defined(NXS_NET_DEBUG_5) || defined(NXS_NET_DEBUG_6) || defined(NXS_NET_DEBUG_7) \
|| defined(NXS_NET_DEBUG_8)
static const RsPeerId peer_to_print = RsPeerId(std::string("")) ;
static const RsGxsGroupId group_id_to_print = RsGxsGroupId(std::string("")) ; // use this to allow to this group id only, or "" for all IDs
static const uint32_t service_to_print = RS_SERVICE_TYPE_GXS_TRANS ; // use this to allow to this service id only, or 0 for all services
static const uint32_t service_to_print = RS_SERVICE_GXS_TYPE_CHANNELS ; // use this to allow to this service id only, or 0 for all services
// warning. Numbers should be SERVICE IDS (see serialiser/rsserviceids.h. E.g. 0x0215 for forums)
class nullstream: public std::ostream {};
@ -448,6 +451,7 @@ int RsGxsNetService::tick()
{
syncWithPeers();
syncGrpStatistics();
checkDistantSyncState();
mSyncTs = now;
}
@ -566,7 +570,6 @@ void RsGxsNetService::syncWithPeers()
std::set<RsPeerId> peers;
mNetMgr->getOnlineList(mServiceInfo.mServiceType, peers);
#ifdef TODO
if(mAllowDistSync)
{
// Grab all online virtual peers of distant tunnels for the current service.
@ -575,9 +578,8 @@ void RsGxsNetService::syncWithPeers()
mGxsNetTunnel->getVirtualPeers(mServType,vpids);
for(auto it(vpids.begin());it!=vpids.end();++it)
peers.push_back(RsPeerId(*it)) ;
peers.insert(RsPeerId(*it)) ;
}
#endif
if (peers.empty()) {
// nothing to do
@ -735,6 +737,62 @@ void RsGxsNetService::syncWithPeers()
#endif
}
void RsGxsNetService::checkDistantSyncState()
{
if(!mAllowDistSync)
return ;
RsGxsGrpMetaTemporaryMap grpMeta;
mDataStore->retrieveGxsGrpMetaData(grpMeta);
// Go through group statistics and groups without information are re-requested to random peers selected
// among the ones who provided the group info.
#ifdef NXS_NET_DEBUG_8
GXSNETDEBUG___<< "Checking distant sync for all groups." << std::endl;
#endif
// get the list of online peers
std::set<RsPeerId> online_peers;
mNetMgr->getOnlineList(mServiceInfo.mServiceType , online_peers);
uint16_t service_id = ((mServiceInfo.mServiceType >> 8)& 0xffff);
RS_STACK_MUTEX(mNxsMutex) ;
for(auto it(grpMeta.begin());it!=grpMeta.end();++it)
if(it->second->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED) // we only consider subscribed groups here.
{
#warning (cyril) We might need to also remove peers for recently unsubscribed groups
const RsGxsGroupId& grpId(it->first);
const RsGxsGrpConfig& rec = locked_getGrpConfig(grpId) ;
#ifdef NXS_NET_DEBUG_6
GXSNETDEBUG__G(it->first) << " group " << grpId;
#endif
bool at_least_one_friend_is_supplier = false ;
for(auto it2(rec.suppliers.ids.begin());it2!=rec.suppliers.ids.end() && !at_least_one_friend_is_supplier;++it2)
if(online_peers.find(*it2) != online_peers.end()) // check that the peer is online
at_least_one_friend_is_supplier = true ;
if(at_least_one_friend_is_supplier)
{
mGxsNetTunnel->releasePeers(service_id,grpId);
#ifdef NXS_NET_DEBUG_8
GXSNETDEBUG___<< " Group " << grpId << ": suppliers among friends. Releasing peers." << std::endl;
#endif
}
else
{
mGxsNetTunnel->requestPeers(service_id,grpId);
#ifdef NXS_NET_DEBUG_8
GXSNETDEBUG___<< " Group " << grpId << ": no suppliers among friends. Requesting peers." << std::endl;
#endif
}
}
}
void RsGxsNetService::syncGrpStatistics()
{
RS_STACK_MUTEX(mNxsMutex) ;
@ -763,44 +821,44 @@ void RsGxsNetService::syncGrpStatistics()
#endif
if(rec.statistics_update_TS + GROUP_STATS_UPDATE_DELAY < now && rec.suppliers.ids.size() > 0)
{
{
#ifdef NXS_NET_DEBUG_6
GXSNETDEBUG__G(it->first) << " needs update. Randomly asking to some friends" << std::endl;
GXSNETDEBUG__G(it->first) << " needs update. Randomly asking to some friends" << std::endl;
#endif
// randomly select GROUP_STATS_UPDATE_NB_PEERS friends among the suppliers of this group
// randomly select GROUP_STATS_UPDATE_NB_PEERS friends among the suppliers of this group
uint32_t n = RSRandom::random_u32() % rec.suppliers.ids.size() ;
uint32_t n = RSRandom::random_u32() % rec.suppliers.ids.size() ;
std::set<RsPeerId>::const_iterator rit = rec.suppliers.ids.begin();
for(uint32_t i=0;i<n;++i)
++rit ;
std::set<RsPeerId>::const_iterator rit = rec.suppliers.ids.begin();
for(uint32_t i=0;i<n;++i)
++rit ;
for(uint32_t i=0;i<std::min(rec.suppliers.ids.size(),(size_t)GROUP_STATS_UPDATE_NB_PEERS);++i)
{
// we started at a random position in the set, wrap around if the end is reached
if(rit == rec.suppliers.ids.end())
rit = rec.suppliers.ids.begin() ;
RsPeerId peer_id = *rit ;
++rit ;
if(online_peers.find(peer_id) != online_peers.end()) // check that the peer is online
for(uint32_t i=0;i<std::min(rec.suppliers.ids.size(),(size_t)GROUP_STATS_UPDATE_NB_PEERS);++i)
{
// we started at a random position in the set, wrap around if the end is reached
if(rit == rec.suppliers.ids.end())
rit = rec.suppliers.ids.begin() ;
RsPeerId peer_id = *rit ;
++rit ;
if(online_peers.find(peer_id) != online_peers.end()) // check that the peer is online
{
#ifdef NXS_NET_DEBUG_6
GXSNETDEBUG_PG(peer_id,it->first) << " asking friend " << peer_id << " for an update of stats for group " << it->first << std::endl;
GXSNETDEBUG_PG(peer_id,it->first) << " asking friend " << peer_id << " for an update of stats for group " << it->first << std::endl;
#endif
RsNxsSyncGrpStatsItem *grs = new RsNxsSyncGrpStatsItem(mServType) ;
RsNxsSyncGrpStatsItem *grs = new RsNxsSyncGrpStatsItem(mServType) ;
grs->request_type = RsNxsSyncGrpStatsItem::GROUP_INFO_TYPE_REQUEST ;
grs->request_type = RsNxsSyncGrpStatsItem::GROUP_INFO_TYPE_REQUEST ;
grs->grpId = it->first ;
grs->PeerId(peer_id) ;
grs->grpId = it->first ;
grs->PeerId(peer_id) ;
sendItem(grs) ;
sendItem(grs) ;
}
}
}
}
}
#ifdef NXS_NET_DEBUG_6
else
GXSNETDEBUG__G(it->first) << " up to date." << std::endl;

View File

@ -35,6 +35,7 @@
#include "pqi/p3linkmgr.h"
#include "rsitems/rsnxsitems.h"
#include "rsitems/rsgxsupdateitems.h"
#include "rsgxsnettunnel.h"
#include "rsgxsnetutils.h"
#include "pqi/p3cfgmgr.h"
#include "rsgixs.h"
@ -394,6 +395,7 @@ private:
void locked_pushGrpRespFromList(std::list<RsNxsItem*>& respList, const RsPeerId& peer, const uint32_t& transN);
void locked_pushMsgRespFromList(std::list<RsNxsItem*>& itemL, const RsPeerId& sslId, const RsGxsGroupId &grp_id, const uint32_t& transN);
void checkDistantSyncState();
void syncWithPeers();
void syncGrpStatistics();
void addGroupItemToList(NxsTransaction*& tr,
@ -582,6 +584,8 @@ private:
uint32_t mDefaultMsgStorePeriod ;
uint32_t mDefaultMsgSyncPeriod ;
RsGxsNetTunnelService *mGxsNetTunnel;
};
#endif // RSGXSNETSERVICE_H

View File

@ -193,14 +193,14 @@ bool RsGxsNetTunnelService::getVirtualPeers(uint16_t service_id, std::list<RsGxs
if(it->second.providing_set.find(service_id) != it->second.providing_set.end())
peers.push_back(it->first) ;
#ifdef DEBUG_GXS_TUNNEL
#ifdef DEBUG_RSGXSNETTUNNEL
GXS_NET_TUNNEL_DEBUG() << " service " << std::hex << service_id << std::dec << " returning " << peers.size() << " peers." << std::endl;
#endif
return true ;
}
bool RsGxsNetTunnelService::requestPeers(const RsGxsGroupId& group_id)
bool RsGxsNetTunnelService::requestPeers(uint16_t service_id,const RsGxsGroupId& group_id)
{
RS_STACK_MUTEX(mGxsNetTunnelMtx);
@ -211,11 +211,14 @@ bool RsGxsNetTunnelService::requestPeers(const RsGxsGroupId& group_id)
ginfo.group_policy = RsGxsNetTunnelGroupInfo::RS_GXS_NET_TUNNEL_GRP_POLICY_ACTIVE;
// we dont set the group policy here. It will only be set if no peers, or too few peers are available.
#ifdef DEBUG_RSGXSNETTUNNEL
GXS_NET_TUNNEL_DEBUG() << " service " << std::hex << service_id << std::dec << " requesting peers for group " << group_id << std::endl;
#endif
return true;
}
bool RsGxsNetTunnelService::releasePeers(const RsGxsGroupId& group_id)
bool RsGxsNetTunnelService::releasePeers(uint16_t service_id, const RsGxsGroupId& group_id)
{
RS_STACK_MUTEX(mGxsNetTunnelMtx);
@ -226,6 +229,9 @@ bool RsGxsNetTunnelService::releasePeers(const RsGxsGroupId& group_id)
ginfo.group_policy = RsGxsNetTunnelGroupInfo::RS_GXS_NET_TUNNEL_GRP_POLICY_PASSIVE;
mTurtle->stopMonitoringTunnels(ginfo.hash) ;
#ifdef DEBUG_RSGXSNETTUNNEL
GXS_NET_TUNNEL_DEBUG() << " service " << std::hex << service_id << std::dec << " releasing peers for group " << group_id << std::endl;
#endif
return true;
}
@ -329,14 +335,14 @@ void RsGxsNetTunnelService::receiveTurtleData(RsTurtleGenericTunnelItem *item,co
}
// find the group id
auto it = mHandledHashes.find(hash) ;
auto it4 = mHandledHashes.find(hash) ;
if(it == mHandledHashes.end())
if(it4 == mHandledHashes.end())
{
GXS_NET_TUNNEL_ERROR() << "Cannot find hash " << hash << " to be handled by GxsNetTunnel" << std::endl;
return;
}
RsGxsGroupId group_id = it->second;
RsGxsGroupId group_id = it4->second;
// Now check if we got an item to advertise a virtual peer
@ -367,21 +373,33 @@ void RsGxsNetTunnelService::receiveTurtleData(RsTurtleGenericTunnelItem *item,co
#ifdef DEBUG_RSGXSNETTUNNEL
GXS_NET_TUNNEL_DEBUG() << " item is a virtual peer id item with vpid = "<< pid_item->virtual_peer_id << ". Setting virtual peer." << std::endl;
#endif
#ifdef TODO
vp_info.net_service_virtual_peer = pid_item->virtual_peer_id;
vp_info.vpid_status = RsGxsNetTunnelVirtualPeerInfo::RS_GXS_NET_TUNNEL_VP_STATUS_ACTIVE ;
#endif
// we receive a virtual peer id, so we need to update the local information for this peer id
mTurtle2GxsPeer[turtle_virtual_peer_id] = pid_item->virtual_peer_id ;
RsGxsNetTunnelVirtualPeerInfo& vp_info(mVirtualPeers[pid_item->virtual_peer_id]) ;
vp_info.vpid_status = RsGxsNetTunnelVirtualPeerInfo::RS_GXS_NET_TUNNEL_VP_STATUS_ACTIVE ; // status of the peer
vp_info.side = direction; // client/server
vp_info.last_contact = time(NULL); // last time some data was sent/recvd
memcpy(vp_info.encryption_master_key,encryption_master_key,RS_GXS_TUNNEL_CONST_EKEY_SIZE);
vp_info.turtle_virtual_peer_id = turtle_virtual_peer_id; // turtle peer to use when sending data to this vpid.
free(data);
return ;
}
delete decrypted_item ;
// item is a generic data item for the client. Let's store the data in the appropriate incoming data queue.
#ifdef TODO
auto it = mTurtle2GxsPeer.find(turtle_virtual_peer_id) ;
if(it == mTurtle2GxsPeer.end())
{
GXS_NET_TUNNEL_ERROR() << "item received by GxsNetTunnel for vpid " << turtle_virtual_peer_id << " but this vpid is unknown!" << std::endl;
free(data);
return;
}
@ -391,28 +409,27 @@ void RsGxsNetTunnelService::receiveTurtleData(RsTurtleGenericTunnelItem *item,co
if(it2 == mVirtualPeers.end())
{
GXS_NET_TUNNEL_ERROR() << "item received by GxsNetTunnel for GXS vpid " << gxs_vpid " but the virtual peer id is missing!" << std::endl;
GXS_NET_TUNNEL_ERROR() << "item received by GxsNetTunnel for GXS vpid " << gxs_vpid << " but the virtual peer id is missing!" << std::endl;
free(data);
return;
}
RsGxsNetTunnelVirtualPeerInfo& vp_info(it2->second) ;
uint16_t service_id = getRsItemService(getRsItemId(data)) ;
else
{
#ifdef DEBUG_RSGXSNETTUNNEL
GXS_NET_TUNNEL_DEBUG() << " item is GXS data. Storing into incoming list." << std::endl;
GXS_NET_TUNNEL_DEBUG() << "item contains generic data for service " << std::hex << service_id << std::dec << " for VPID " << gxs_vpid << ". Storing in incoming list" << std::endl;
#endif
// push the data into the service incoming data list
RsTlvBinaryData *bind = new RsTlvBinaryData;
bind->tlvtype = 0;
bind->bin_len = data_size;
bind->bin_data = data;
// push the data into the service incoming data list
vp_info.incoming_data.push_back(bind) ;
}
#endif
RsTlvBinaryData *bind = new RsTlvBinaryData;
bind->tlvtype = 0;
bind->bin_len = data_size;
bind->bin_data = data;
vp_info.providing_set[service_id].incoming_data.push_back(bind) ;
}
void RsGxsNetTunnelService::addVirtualPeer(const TurtleFileHash& hash, const TurtleVirtualPeerId& vpid,RsTurtleGenericTunnelItem::Direction dir)

View File

@ -87,6 +87,13 @@
// Therefore, virtual peers are stored separately from groups, because each one can sync multiple groups.
//
// * virtual peers are also shared among services. This reduces the required amount of tunnels and tunnel requests to send.
//
//
// How do we know that a group needs distant sync?
// * look into GrpConfigMap for suppliers. Suppliers is cleared at load.
// * last_update_TS in GrpConfigMap is randomised so it cannot be used
// * we need a way to know that there's no suppliers for good reasons (not that we just started)
// *
typedef RsPeerId RsGxsNetTunnelVirtualPeerId ;
@ -105,16 +112,15 @@ struct RsGxsNetTunnelVirtualPeerInfo
RS_GXS_NET_TUNNEL_VP_STATUS_ACTIVE = 0x02 // virtual peer id is known. Data can transfer.
};
RsGxsNetTunnelVirtualPeerInfo() : vpid_status(RS_GXS_NET_TUNNEL_VP_STATUS_UNKNOWN) { memset(encryption_master_key,0,32) ; }
RsGxsNetTunnelVirtualPeerInfo() : vpid_status(RS_GXS_NET_TUNNEL_VP_STATUS_UNKNOWN), last_contact(0),side(0) { memset(encryption_master_key,0,32) ; }
~RsGxsNetTunnelVirtualPeerInfo() ;
uint8_t vpid_status ; // status of the peer
uint8_t side ; // client/server
uint8_t encryption_master_key[32] ; // key from which the encryption key is derived for each virtual peer (using H(master_key | random IV))
time_t last_contact ; // last time some data was sent/recvd
uint8_t side ; // client/server
uint8_t encryption_master_key[32];
TurtleVirtualPeerId turtle_virtual_peer_id ; // turtle peer to use when sending data to this vpid.
RsGxsGroupId group_id ; // group id
std::map<uint16_t,RsGxsNetTunnelVirtualPeerProvidingSet> providing_set; // partial list of groups provided by this virtual peer id, based on tunnel results, for each service
};
@ -155,13 +161,13 @@ public:
* \brief Manage tunnels for this group
* @param group_id group for which tunnels should be released
*/
bool requestPeers(const RsGxsGroupId&group_id) ;
bool requestPeers(uint16_t service_id, const RsGxsGroupId&group_id) ;
/*!
* \brief Stop managing tunnels for this group
* @param group_id group for which tunnels should be released
*/
bool releasePeers(const RsGxsGroupId&group_id) ;
bool releasePeers(uint16_t service_id,const RsGxsGroupId&group_id) ;
/*!
* \brief Get the list of active virtual peers for a given group. This implies that a tunnel is up and

View File

@ -1386,10 +1386,10 @@ int RsServer::StartupRetroShare()
// create GXS photo service
RsGxsNetService* gxschannels_ns = new RsGxsNetService(
RS_SERVICE_GXS_TYPE_CHANNELS, gxschannels_ds, nxsMgr,
mGxsChannels, mGxsChannels->getServiceInfo(),
mReputations, mGxsCircles,mGxsIdService,
pgpAuxUtils);
RS_SERVICE_GXS_TYPE_CHANNELS, gxschannels_ds, nxsMgr,
mGxsChannels, mGxsChannels->getServiceInfo(),
mReputations, mGxsCircles,mGxsIdService,
pgpAuxUtils,true,true,true);
mGxsChannels->setNetworkExchangeService(gxschannels_ns) ;