added asymmetry in GXS tunnel management to reduce the number of tunnels

This commit is contained in:
csoler 2018-04-11 23:14:10 +02:00
parent 393ff75c90
commit 66df281f25
No known key found for this signature in database
GPG Key ID: 7BCA522266C0804C
3 changed files with 113 additions and 15 deletions

View File

@ -799,6 +799,9 @@ void RsGxsNetService::checkDistantSyncState()
if(online_peers.find(*it2) != online_peers.end()) // check that the peer is online
at_least_one_friend_is_supplier = true ;
// That strategy is likely to create islands of friends connected to each other. There's no real way
// to decide what to do here, except maybe checking the last message TS remotely vs. locally.
if(at_least_one_friend_is_supplier)
{
mGxsNetTunnel->releasePeers(mServType,grpId);

View File

@ -36,7 +36,11 @@
#define GXS_NET_TUNNEL_ERROR() std::cerr << "(EE) GXS_NET_TUNNEL ERROR : "
RsGxsNetTunnelService::RsGxsNetTunnelService(): mGxsNetTunnelMtx("GxsNetTunnel") {}
RsGxsNetTunnelService::RsGxsNetTunnelService(): mGxsNetTunnelMtx("GxsNetTunnel")
{
#warning this is for testing only. In the final version this needs to be initialized with some random content.
memset(mRandomBias,0,RS_GXS_TUNNEL_CONST_RANDOM_BIAS_SIZE) ;
}
//===========================================================================================================================================//
// Transport Items //
@ -287,7 +291,7 @@ RsGxsNetTunnelVirtualPeerId RsGxsNetTunnelService::locked_makeVirtualPeerId() co
//memcpy(mem+RsPeerId::SIZE_IN_BYTES ,group_id.toByteArray(),RsGxsGroupId::SIZE_IN_BYTES) ;
memcpy(mem+RsPeerId::SIZE_IN_BYTES /*+RsGxsGroupId::SIZE_IN_BYTES*/,mRandomBias ,RS_GXS_TUNNEL_CONST_RANDOM_BIAS_SIZE) ;
return RsGxsNetTunnelVirtualPeerId(RsDirUtil::sha1sum(mem,RsPeerId::SIZE_IN_BYTES+RsGxsGroupId::SIZE_IN_BYTES+RS_GXS_TUNNEL_CONST_RANDOM_BIAS_SIZE).toByteArray());
return RsGxsNetTunnelVirtualPeerId(RsDirUtil::sha1sum(mem,RsPeerId::SIZE_IN_BYTES+/*RsGxsGroupId::SIZE_IN_BYTES+*/ RS_GXS_TUNNEL_CONST_RANDOM_BIAS_SIZE).toByteArray());
}
void RsGxsNetTunnelService::dump() const
@ -318,7 +322,9 @@ void RsGxsNetTunnelService::dump() const
for(auto it(mGroups.begin());it!=mGroups.end();++it)
{
std::cerr << " " << it->first << " hash: " << it->second.hash << " policy: " << group_policy_str[it->second.group_policy] << " status: " << group_status_str[it->second.group_status] << " Last contact: " << time(NULL) - it->second.last_contact << " secs ago" << std::endl;
std::cerr << " virtual peers:" << std::endl;
if(!it->second.virtual_peers.empty())
std::cerr << " virtual peers:" << std::endl;
for(auto it2(it->second.virtual_peers.begin());it2!=it->second.virtual_peers.end();++it2)
std::cerr << " " << *it2 << std::endl;
}
@ -334,6 +340,9 @@ void RsGxsNetTunnelService::dump() const
for(auto it2(it->second.providing_set.begin());it2!=it->second.providing_set.end();++it2)
std::cerr << " service " << std::hex << it2->first << std::dec << " " << it2->second.provided_groups.size() << " groups" << std::endl;
}
std::cerr << "Virtual peer turtle => GXS conversion table: " << std::endl;
for(auto it(mTurtle2GxsPeer.begin());it!=mTurtle2GxsPeer.end();++it)
std::cerr << " " << it->first << " => " << it->second << std::endl;
std::cerr << "Hashes: " << std::endl;
for(auto it(mHandledHashes.begin());it!=mHandledHashes.end();++it)
@ -413,7 +422,7 @@ void RsGxsNetTunnelService::receiveTurtleData(RsTurtleGenericTunnelItem *item,co
RsItem *decrypted_item = RsGxsNetTunnelSerializer().deserialise(data,&data_size);
RsGxsNetTunnelVirtualPeerItem *pid_item = dynamic_cast<RsGxsNetTunnelVirtualPeerItem*>(decrypted_item) ;
if(!pid_item)
if(!pid_item) // this handles the case of a KeepAlive packet.
{
delete decrypted_item ;
return ;
@ -555,7 +564,7 @@ void RsGxsNetTunnelService::removeVirtualPeer(const TurtleFileHash& hash, const
if(ginfo.virtual_peers.empty())
{
ginfo.group_status = RsGxsNetTunnelGroupInfo::RS_GXS_NET_TUNNEL_GRP_STATUS_TUNNELS_REQUESTED ;
ginfo.group_status = RsGxsNetTunnelGroupInfo::RS_GXS_NET_TUNNEL_GRP_STATUS_IDLE ;
#ifdef DEBUG_RSGXSNETTUNNEL
GXS_NET_TUNNEL_DEBUG() << " no more virtual peers for group " << group_id << ": setting status to TUNNELS_REQUESTED" << std::endl;
@ -617,30 +626,115 @@ void RsGxsNetTunnelService::data_tick()
{
last_dump = now;
dump();
sendKeepAlivePackets() ;
}
}
void RsGxsNetTunnelService::sendKeepAlivePackets()
{
RS_STACK_MUTEX(mGxsNetTunnelMtx);
RsGxsNetTunnelVirtualPeerId own_gxs_vpid = locked_makeVirtualPeerId() ;
#ifdef DEBUG_RSGXSNETTUNNEL
GXS_NET_TUNNEL_DEBUG() << " sending keep-alive packets. Own GXS peer ID is " << own_gxs_vpid << std::endl;
#endif
// We send KA packets for each GXS virtual peer. The advantage is that unused tunnels will automatically die which eliminates duplicate tunnels
// automatically. We only send from the client side.
for(auto it(mVirtualPeers.begin());it!=mVirtualPeers.end();++it)
if(own_gxs_vpid < it->first)
{
#ifdef DEBUG_RSGXSNETTUNNEL
GXS_NET_TUNNEL_DEBUG() << " sending to virtual peer " << it->first << " through tunnel " << it->second.turtle_virtual_peer_id << std::endl;
#endif
RsGxsNetTunnelKeepAliveItem pitem ;
RsTemporaryMemory tmpmem( RsGxsNetTunnelSerializer().size(&pitem) ) ;
uint32_t len = tmpmem.size();
RsGxsNetTunnelSerializer().serialise(&pitem,tmpmem,&len);
RsTurtleGenericDataItem *encrypted_turtle_item = NULL ;
if(p3turtle::encryptData(tmpmem,len,it->second.encryption_master_key,encrypted_turtle_item))
mPendingTurtleItems.push_back(std::make_pair(it->second.turtle_virtual_peer_id,encrypted_turtle_item)) ;
}
#ifdef DEBUG_RSGXSNETTUNNEL
else
GXS_NET_TUNNEL_DEBUG() << " ignoring virtual peer " << it->first << std::endl;
#endif
}
void RsGxsNetTunnelService::autowash()
{
RS_STACK_MUTEX(mGxsNetTunnelMtx);
#ifdef DEBUG_RSGXSNETTUNNEL
GXS_NET_TUNNEL_DEBUG() << " performing per-group consistency test." << std::endl;
#endif
RsGxsNetTunnelVirtualPeerId own_gxs_vpid = locked_makeVirtualPeerId() ;
for(auto it(mGroups.begin());it!=mGroups.end();++it)
{
RsGxsNetTunnelGroupInfo& ginfo(it->second) ;
bool should_monitor_tunnels = false ;
#ifdef DEBUG_RSGXSNETTUNNEL
GXS_NET_TUNNEL_DEBUG() << " group " << it->first << ": " ;
#endif
// check whether the group already has GXS virtual peers with suppliers. If so, we can set the GRP policy as passive.
if(ginfo.group_policy == RsGxsNetTunnelGroupInfo::RS_GXS_NET_TUNNEL_GRP_POLICY_ACTIVE)
{
bool found = false ;
// check wether one virtual peer provided by GXS has ID > own ID. In this case we leave the priority to it.
for(auto it2(ginfo.virtual_peers.begin());!found && it2!=ginfo.virtual_peers.end();++it2)
{
auto it3 = mTurtle2GxsPeer.find(*it2) ;
if( it3 != mTurtle2GxsPeer.end() && it3->second < own_gxs_vpid)
found = true ;
}
if(found)
{
#ifdef DEBUG_RSGXSNETTUNNEL
std::cerr << " active, with client-side peers : ";
#endif
should_monitor_tunnels = true ;
}
else
{
should_monitor_tunnels = false ;
#ifdef DEBUG_RSGXSNETTUNNEL
std::cerr << " active, and no client-side peers available : " ;
#endif
}
}
#ifdef DEBUG_RSGXSNETTUNNEL
else
std::cerr << " passive : ";
#endif
// check if the group is in active or passive mode, in which case make sure that turtle manages tunnels or not.
// if active, then we check wether they are tunnels are not. If not, we ask turtle to monitor.
RsGxsNetTunnelGroupInfo& ginfo(it->second) ;
if(ginfo.group_policy == RsGxsNetTunnelGroupInfo::RS_GXS_NET_TUNNEL_GRP_POLICY_ACTIVE && ginfo.virtual_peers.empty())
if(should_monitor_tunnels)
{
#ifdef DEBUG_RSGXSNETTUNNEL
std::cerr << " requesting tunnels" << std::endl;
#endif
mTurtle->monitorTunnels(ginfo.hash,this,false) ;
ginfo.group_status = RsGxsNetTunnelGroupInfo::RS_GXS_NET_TUNNEL_GRP_STATUS_TUNNELS_REQUESTED;
}
if(ginfo.group_policy == RsGxsNetTunnelGroupInfo::RS_GXS_NET_TUNNEL_GRP_POLICY_PASSIVE)
else
{
#ifdef DEBUG_RSGXSNETTUNNEL
std::cerr << " dropping tunnels" << std::endl;
#endif
mTurtle->stopMonitoringTunnels(ginfo.hash);
ginfo.group_status = RsGxsNetTunnelGroupInfo::RS_GXS_NET_TUNNEL_GRP_STATUS_IDLE;
}
}
}

View File

@ -129,14 +129,14 @@ struct RsGxsNetTunnelGroupInfo
enum GroupStatus {
RS_GXS_NET_TUNNEL_GRP_STATUS_UNKNOWN = 0x00, // unknown status
RS_GXS_NET_TUNNEL_GRP_STATUS_IDLE = 0x01, // no virtual peers requested, just waiting
RS_GXS_NET_TUNNEL_GRP_STATUS_TUNNELS_REQUESTED = 0x02, // virtual peers requested, and waiting for turtle to answer
// RS_GXS_NET_TUNNEL_GRP_STATUS_TUNNELS_REQUESTED = 0x02, // virtual peers requested, and waiting for turtle to answer
RS_GXS_NET_TUNNEL_GRP_STATUS_VPIDS_AVAILABLE = 0x03 // some virtual peers are available. Data can be read/written
};
enum GroupPolicy {
RS_GXS_NET_TUNNEL_GRP_POLICY_UNKNOWN = 0x00, // nothing has been set
RS_GXS_NET_TUNNEL_GRP_POLICY_PASSIVE = 0x01, // group is available for server side tunnels, but does not explicitely request tunnels
RS_GXS_NET_TUNNEL_GRP_POLICY_ACTIVE = 0x02, // group explicitely request tunnels, if none available
RS_GXS_NET_TUNNEL_GRP_POLICY_ACTIVE = 0x02, // group will explicitely request tunnels, if none available
};
RsGxsNetTunnelGroupInfo() : group_policy(RS_GXS_NET_TUNNEL_GRP_POLICY_PASSIVE),group_status(RS_GXS_NET_TUNNEL_GRP_STATUS_IDLE),last_contact(0),service_id(0) {}
@ -147,7 +147,7 @@ struct RsGxsNetTunnelGroupInfo
TurtleFileHash hash ;
uint16_t service_id ;
std::set<RsGxsNetTunnelVirtualPeerId> virtual_peers ; // list of which virtual peers provide this group. Can me more than 1.
std::set<TurtleVirtualPeerId> virtual_peers ; // list of which virtual peers provide this group. Can me more than 1.
};
class RsGxsNetTunnelService: public RsTurtleClientService, public RsTickingThread
@ -231,6 +231,7 @@ protected:
p3turtle *mTurtle ;
private:
void autowash() ;
void sendKeepAlivePackets() ;
void handleIncoming(RsGxsNetTunnelItem *item) ;
void flush_pending_items();