changed global router routing strategy. Should be more effective now.

This commit is contained in:
csoler 2016-01-30 20:27:56 -05:00
parent 59d4c2c829
commit f0a49a427e
10 changed files with 308 additions and 135 deletions

View file

@ -190,6 +190,7 @@
#include "services/p3idservice.h"
#include "turtle/p3turtle.h"
#include "gxs/rsgixs.h"
#include "retroshare/rspeers.h"
#include "p3grouter.h"
#include "grouteritems.h"
@ -197,7 +198,7 @@
#include "grouterclientservice.h"
/**********************/
//#define GROUTER_DEBUG
#define GROUTER_DEBUG
/**********************/
const std::string p3GRouter::SERVICE_INFO_APP_NAME = "Global Router" ;
@ -855,6 +856,34 @@ void p3GRouter::handleTunnels()
}
}
void p3GRouter::locked_sendToPeers(RsGRouterGenericDataItem *data_item,const std::map<RsPeerId,uint32_t>& peers_and_duplication_factors)
{
// slice the data appropriately and send.
uint32_t saved_duplication_factor = data_item->duplication_factor ; // this little trick avoids copying the item for each peer before slicing it up
for(std::map<RsPeerId,uint32_t>::const_iterator itpid(peers_and_duplication_factors.begin());itpid!=peers_and_duplication_factors.end();++itpid)
{
std::list<RsGRouterTransactionChunkItem*> chunks ;
data_item->duplication_factor = itpid->second;
sliceDataItem(data_item,chunks) ;
for(std::list<RsGRouterTransactionChunkItem*>::const_iterator it2(chunks.begin());it2!=chunks.end();++it2)
locked_sendTransactionData(itpid->first,*(*it2) ) ;
#ifdef GROUTER_DEBUG
std::cerr << " sending " << chunks.size() << " slices to peer " << itpid->first << " with duplication factor = " << itpid->second << std::endl;
#endif
// delete temporary items
for(std::list<RsGRouterTransactionChunkItem*>::const_iterator cit=chunks.begin();cit!=chunks.end();++cit)
delete *cit;
}
data_item->duplication_factor = saved_duplication_factor ;
}
void p3GRouter::routePendingObjects()
{
// Go throught he list of pending messages. For those with a peer ready, send the message to that peer.
@ -885,56 +914,59 @@ void p3GRouter::routePendingObjects()
{
// Look for tunnels and friends where to send the data. Send to both.
std::list<RsPeerId> peers ;
if(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_TUNNELS)
locked_collectAvailableTunnels(it->second.tunnel_hash,peers);
// For now, disable friends. We'll first check that the good old tunnel system works as before.
std::map<RsPeerId,uint32_t> peers_and_duplication_factors ;
// if(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_TUNNELS)
// locked_collectAvailableTunnels(it->second.tunnel_hash,it->second.data_item->duplication_factor,peers_and_duplication_factors);
//
// if(!peers_and_duplication_factors.empty())
// {
//#ifdef GROUTER_DEBUG
// std::cerr << " tunnels available! sending!" << std::endl;
//#endif
// locked_sendToPeers(it->second.data_item,peers_and_duplication_factors) ;
//
// // change item state in waiting list
//
// it->second.data_status = RS_GROUTER_DATA_STATUS_ONGOING ;
// it->second.data_transaction_TS = now ;
//
// pending_messages_changed = true ;
// continue ; // no need to seek for friend asynced routes since tunnels directly go to the final destination!
// }
if(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_FRIENDS)
locked_collectAvailableFriends(it->second.data_item->destination_key,peers, it->second.incoming_routes.ids, it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_IS_ORIGIN);
locked_collectAvailableFriends(it->second.data_item->destination_key,it->second.incoming_routes.ids,it->second.data_item->duplication_factor,peers_and_duplication_factors);
if(peers.empty())
{
#ifdef GROUTER_DEBUG
std::cerr << " no direct friends available" << std::endl;
#endif
// if(!peers_and_duplication_factors.empty())
// {
//#ifdef GROUTER_DEBUG
// std::cerr << " friends available! sending!" << std::endl;
//#endif
// locked_sendToPeers(it->second.data_item,peers_and_duplication_factors) ;
//
// // change item state in waiting list
//
// it->second.data_status = RS_GROUTER_DATA_STATUS_ONGOING ;
// it->second.data_transaction_TS = now ;
//
// pending_messages_changed = true ;
// }
// else
// {
//#ifdef GROUTER_DEBUG
// std::cerr << " no direct friends available" << std::endl;
//#endif
//
// if(it->second.received_time_TS + DIRECT_FRIEND_TRY_DELAY < now && !(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_TUNNELS))
// {
//#ifdef GROUTER_DEBUG
// std::cerr << " enabling tunnels for this message." << std::endl;
//#endif
// it->second.routing_flags |= GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_TUNNELS ;
// }
// }
if(it->second.received_time_TS + DIRECT_FRIEND_TRY_DELAY < now && !(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_TUNNELS))
{
#ifdef GROUTER_DEBUG
std::cerr << " enabling tunnels for this message." << std::endl;
#endif
it->second.routing_flags |= GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_TUNNELS ;
}
continue ;
}
// slice the data appropriately and send.
std::list<RsGRouterTransactionChunkItem*> chunks ;
sliceDataItem(it->second.data_item,chunks) ;
#ifdef GROUTER_DEBUG
if(!peers.empty())
std::cerr << " sending to peers:" << std::endl;
#endif
for(std::list<RsPeerId>::const_iterator itpid(peers.begin());itpid!=peers.end();++itpid)
for(std::list<RsGRouterTransactionChunkItem*>::const_iterator it2(chunks.begin());it2!=chunks.end();++it2)
locked_sendTransactionData(*itpid,*(*it2) ) ;
// delete temporary items
for(std::list<RsGRouterTransactionChunkItem*>::const_iterator cit=chunks.begin();cit!=chunks.end();++cit)
delete *cit;
// change item state in waiting list
it->second.data_status = RS_GROUTER_DATA_STATUS_ONGOING ;
it->second.data_transaction_TS = now ;
pending_messages_changed = true ;
}
else if(it->second.data_status == RS_GROUTER_DATA_STATUS_ONGOING && now > MAX_TRANSACTION_ACK_WAITING_TIME + it->second.data_transaction_TS)
{
@ -1014,70 +1046,161 @@ void p3GRouter::routePendingObjects()
IndicateConfigChanged() ;
}
void p3GRouter::locked_collectAvailableFriends(const GRouterKeyId& gxs_id,std::list<RsPeerId>& friend_peers,const std::set<RsPeerId>& incoming_routes,bool is_origin)
void p3GRouter::locked_collectAvailableFriends(const GRouterKeyId& gxs_id,const std::set<RsPeerId>& incoming_routes,uint32_t duplication_factor, std::map<RsPeerId,uint32_t>& friend_peers_and_duplication_factors)
{
// The strategy is the following:
// Old strategy was the following:
// if origin
// send to multiple neighbors : best and random
// else
// send to a single "best" neighbor (determined by threshold over routing probability),
std::set<RsPeerId> ids ;
mServiceControl->getPeersConnected(getServiceInfo().mServiceType,ids) ;
std::vector<float> probas;
std::vector<RsPeerId> tmp_peers;
// remove previous peers
for(std::set<RsPeerId>::const_iterator it(ids.begin());it!=ids.end();++it)
if(incoming_routes.find(*it) == incoming_routes.end())
tmp_peers.push_back(*it) ;
if(tmp_peers.empty())
return ;
_routing_matrix.computeRoutingProbabilities(gxs_id, tmp_peers, probas) ;
// New strategy is:
//
// Characteristics of the distribution to look at:
// * who's online, who's not
// * all values quite equal
// * single value well above others
// * largest value is small
// Algorithm:
//
// 0 - encode duplicate factor in routed item and allow at most N duplicates
// - when forwarding to N peers, split the duplication factor into N bins, each being proportional to the forwarding probability.
// Example for N=3 and D=10:
//
// p Calculation Final bin
//
// +-0.21--> 0.21*10=2.1 --> 2 0.1 below
// |
// 10 ----+-0.45--> 0.45*10=4.5 --> 4.6-> 5 0.4 above
// |
// +-0.34--> 0.34*10=3.4 --> 3.0-> 3 0
//
//
// 1 - get routing probabilities p_i for all peers as well as the maximum proba p before normalization.
//
// Set N = min(3,item->duplication_factor) // max number of friends to route to
//
// if p < threshold // That means the routage info is too old => Fallback to random routing.
// Select N random online friends and forward to them.
// else
// Let p_i be the probabilities of all peers
// Select all online peers for which p_i >= 0.5*p.
// if !empty
// Update duplication factors according to probabilities and number of peers
// Route to these peers
// else
// Keep the item
//
#ifdef GROUTER_DEBUG
std::cerr << "locked_getAvailableFriends()" << std::endl;
std::cerr << " getting connected friends, computing routing probabilities" << std::endl;
std::cerr << " looking for friends for item to ID " << gxs_id << " duplication factor = " << duplication_factor << std::endl;
std::cerr << " retrieving online friends and all friends lists." << std::endl;
#endif
std::set<RsPeerId> online_ids ;
std::list<RsPeerId> all_ids ;
rsPeers->getFriendList(all_ids) ;
mServiceControl->getPeersConnected(getServiceInfo().mServiceType,online_ids) ;
std::vector<RsPeerId> tmp_peers;
for(std::list<RsPeerId>::const_iterator it(all_ids.begin());it!=all_ids.end();++it)
tmp_peers.push_back(*it) ;
std::vector<float> probas;
float maximum = 1.0;
float max_probability = 0.0;
_routing_matrix.computeRoutingProbabilities(gxs_id, tmp_peers, probas, maximum) ;
#ifdef GROUTER_DEBUG
std::cerr << " initial routing probabilities (maximum=" << maximum << ")" << std::endl;
for(uint32_t i=0;i<tmp_peers.size();++i)
std::cerr << " " << tmp_peers[i] << " " << probas[i] << std::endl;
#endif
if(maximum < RS_GROUTER_PROBABILITY_THRESHOLD_FOR_RANDOM_ROUTING)
{
max_probability = 0.0f ;
#ifdef GROUTER_DEBUG
std::cerr << " max proba: " << maximum << " is below random threshold => using uniform random routing." << std::endl;
#endif
}
else
{
for(uint32_t i=0;i<tmp_peers.size();++i)
max_probability = std::max(max_probability,probas[i]) ;
}
#ifdef GROUTER_DEBUG
std::cerr << " maxi probability=" << max_probability << std::endl;
#endif
// remove incoming peers and peers for which the proba is less than half the maximum proba
float total_probas = 0.0f ;
uint32_t count=0;
for(uint32_t i=0;i<tmp_peers.size();++i)
if(incoming_routes.find(tmp_peers[i]) != incoming_routes.end())
std::cerr << " removing " << tmp_peers[i] << " which is an incoming route" << std::endl;
else if(probas[i] < RS_GROUTER_PROBABILITY_THRESHOLD_BEST_PEERS_SELECT*max_probability)
std::cerr << " removing " << tmp_peers[i] << " because probability is below best peers threshold" << std::endl;
else
{
tmp_peers[count] = tmp_peers[i] ;
probas[count] = (max_probability==0.0)? (0.5+0.001*RSRandom::random_f32()) : probas[i] ;
++count ;
total_probas+=probas[i] ;
}
tmp_peers.resize(count) ;
probas.resize(count) ;
if(tmp_peers.empty())
{
#ifdef GROUTER_DEBUG
std::cerr << " online - available peers empty => giving up." << std::endl;
#endif
return ;
}
// now select the N best peers
#ifdef GROUTER_DEBUG
std::cerr << " Remaining peers and routing probabilities:" << std::endl;
for(uint32_t i=0;i<tmp_peers.size();++i)
std::cerr << " " << tmp_peers[i] << ", probability: " << probas[i] << std::endl;
#endif
uint32_t max_count = is_origin?3:1 ;
float probability_threshold = is_origin?0.0:0.5 ;
#ifdef GROUTER_DEBUG
std::cerr << " position at origin: " << is_origin << " => mac_count=" << max_count << ", proba threshold=" << probability_threshold << std::endl;
#endif
float probability_threshold = RS_GROUTER_PROBABILITY_THRESHOLD_FOR_RANDOM_ROUTING ;
std::vector<std::pair<float,RsPeerId> > mypairs ;
for(uint32_t i=0;i<tmp_peers.size();++i)
mypairs.push_back(std::make_pair(probas[i],tmp_peers[i])) ;
mypairs.push_back(std::make_pair(probas[i]/total_probas,tmp_peers[i])) ;
// now sort them up
std::sort(mypairs.begin(),mypairs.end(),item_comparator_001()) ;
// take the max_count peers that are still above min_probability
uint32_t max_count = std::min(std::min((uint32_t)mypairs.size(),(uint32_t)GROUTER_MAX_BRANCHING_FACTOR),duplication_factor);
uint32_t n=0 ;
float duplication_factor_delta =0.0;
uint32_t duplication_factor_buff =duplication_factor ;
for(std::vector<std::pair<float,RsPeerId> >::const_reverse_iterator it = mypairs.rbegin();it!=mypairs.rend() && n<max_count;++it)
if( (*it).first >= probability_threshold )
for(int i=mypairs.size()-1;i>=0 && n<max_count;--i)
{
friend_peers.push_back( (*it).second ), ++n ;
#ifdef GROUTER_DEBUG
std::cerr << " keeping " << (*it).second << std::endl;
#endif
if(!is_origin) // only collect one peer if we're not at origin.
break ;
float ideal_dupl = duplication_factor*mypairs[i].first - duplication_factor_delta ; // correct what was taken in advance
uint32_t real_dupl = std::min( duplication_factor - max_count+1,std::max(1u,uint32_t(rint(ideal_dupl)))) ;
duplication_factor_delta = real_dupl - ideal_dupl ;
std::cerr << " Peer " << mypairs[i].second << " prob=" << mypairs[i].first << ", ideal_dupl=" << ideal_dupl << ", real=" << real_dupl << ". Delta = " << duplication_factor_delta << std::endl;
friend_peers_and_duplication_factors[ mypairs[i].second ] = real_dupl ; // should be updated correctly
++n ;
}
}
void p3GRouter::locked_collectAvailableTunnels(const TurtleFileHash& hash,std::list<RsPeerId>& tunnel_peers)
void p3GRouter::locked_collectAvailableTunnels(const TurtleFileHash& hash,uint32_t total_duplication,std::map<RsPeerId,uint32_t>& tunnel_peers_and_duplication_factors)
{
time_t now = time(NULL) ;
@ -1110,7 +1233,7 @@ void p3GRouter::locked_collectAvailableTunnels(const TurtleFileHash& hash,std::l
#endif
TurtleVirtualPeerId vpid = *(vpit->second.virtual_peers.begin()) ;
tunnel_peers.push_back(vpid) ;
tunnel_peers_and_duplication_factors[vpid] = total_duplication ;
}
bool p3GRouter::locked_sendTransactionData(const RsPeerId& pid,const RsGRouterTransactionItem& trans_item)
@ -1927,7 +2050,7 @@ bool p3GRouter::sendData(const RsGxsId& destination,const GRouterServiceId& clie
data_item->data_size = data_size ;
data_item->routing_id = propagation_id ;
data_item->randomized_distance = 0 ;
data_item->duplication_factor = GROUTER_MAX_DUPLICATION_FACTOR ;
data_item->service_id = client_id ;
data_item->destination_key = destination ;
data_item->flags = 0 ; // this is unused for now.
@ -1987,7 +2110,7 @@ bool p3GRouter::sendData(const RsGxsId& destination,const GRouterServiceId& clie
grouter_debug() << " data status = " << info.data_status << std::endl;
grouter_debug() << " tunnel status = " << info.tunnel_status << std::endl;
grouter_debug() << " sending attempt= " << info.sending_attempts << std::endl;
grouter_debug() << " distance = " << info.data_item->randomized_distance << std::endl;
grouter_debug() << " duplicate fact = " << info.data_item->duplication_factor << std::endl;
grouter_debug() << " recv time = " << info.received_time_TS << std::endl;
grouter_debug() << " client id = " << std::hex << data_item->service_id << std::dec << std::endl;
grouter_debug() << " tunnel hash = " << info.tunnel_hash << std::endl;
@ -2134,21 +2257,23 @@ bool p3GRouter::getRoutingMatrixInfo(RsGRouter::GRouterRoutingMatrixInfo& info)
info.friend_ids.clear() ;
info.published_keys.clear() ;
std::set<RsPeerId> ids ;
mServiceControl->getPeersConnected(getServiceInfo().mServiceType,ids) ;
std::list<RsPeerId> ids ;
//mServiceControl->getPeersConnected(getServiceInfo().mServiceType,ids) ;
rsPeers->getFriendList(ids) ;
info.published_keys = _owned_key_ids ;
for(std::set<RsPeerId>::const_iterator it(ids.begin());it!=ids.end();++it)
for(std::list<RsPeerId>::const_iterator it(ids.begin());it!=ids.end();++it)
info.friend_ids.push_back(*it) ;
std::vector<GRouterKeyId> known_keys ;
std::vector<float> probas ;
float maximum= 0.0f;
_routing_matrix.getListOfKnownKeys(known_keys) ;
for(uint32_t i=0;i<known_keys.size();++i)
{
_routing_matrix.computeRoutingProbabilities(known_keys[i],info.friend_ids,probas) ;
_routing_matrix.computeRoutingProbabilities(known_keys[i],info.friend_ids,probas,maximum) ;
info.per_friend_probabilities[known_keys[i]] = probas ;
}
@ -2167,6 +2292,7 @@ bool p3GRouter::getRoutingCacheInfo(std::vector<GRouterRoutingCacheInfo>& infos)
cinfo.mid = it->first ;
cinfo.local_origin = it->second.incoming_routes.ids ;
cinfo.destination = it->second.data_item->destination_key ;
cinfo.duplication_factor = it->second.data_item->duplication_factor ;
cinfo.routing_time = it->second.received_time_TS ;
cinfo.last_tunnel_attempt_time = it->second.last_tunnel_request_TS ;
cinfo.last_sent_time = it->second.last_sent_TS ;
@ -2220,15 +2346,16 @@ void p3GRouter::debugDump()
{
grouter_debug() << " Msg id: " << std::hex << it->first << std::dec ;
grouter_debug() << " data hash: " << it->second.item_hash ;
grouter_debug() << " client id: " << std::hex << it->second.client_id << std::dec;
grouter_debug() << " client: " << std::hex << it->second.client_id << std::dec;
grouter_debug() << " Flags: " << std::hex << it->second.routing_flags << std::dec;
grouter_debug() << " Destination: " << it->second.data_item->destination_key ;
grouter_debug() << " Received: " << now - it->second.received_time_TS << " secs ago.";
grouter_debug() << " Last sent: " << now - it->second.last_sent_TS << " secs ago.";
grouter_debug() << " Transaction TS: " << now - it->second.data_transaction_TS << " secs ago.";
grouter_debug() << " Data Status: " << statusString[it->second.data_status] << std::endl;
grouter_debug() << " Tunl Status: " << statusString[it->second.tunnel_status] << std::endl;
grouter_debug() << " Receipt ok: " << (it->second.receipt_item != NULL) << std::endl;
grouter_debug() << " Dest: " << it->second.data_item->destination_key ;
grouter_debug() << " Recd: " << now - it->second.received_time_TS << " secs ago.";
grouter_debug() << " Sent: " << now - it->second.last_sent_TS << " secs ago.";
grouter_debug() << " Trans. TS: " << now - it->second.data_transaction_TS << " secs ago." ;
grouter_debug() << " Data Status: " << statusString[it->second.data_status] ;
grouter_debug() << " Tunl Status: " << statusString[it->second.tunnel_status] ;
grouter_debug() << " Receipt ok: " << (it->second.receipt_item != NULL) ;
grouter_debug() << " Dup: " << it->second.data_item->duplication_factor << std::endl;
}
grouter_debug() << " Tunnels: " << std::endl;
@ -2251,8 +2378,8 @@ void p3GRouter::debugDump()
grouter_debug() << " Routing matrix: " << std::endl;
if(_debug_enabled)
_routing_matrix.debugDump() ;
// if(_debug_enabled)
// _routing_matrix.debugDump() ;
}