improved routing algorithm. Fixed several bugs.

git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@7269 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
csoler 2014-04-12 19:58:45 +00:00
parent 9f358c5480
commit 9efb7f4136
7 changed files with 231 additions and 202 deletions

View file

@ -235,8 +235,10 @@ int p3GRouter::tick()
//
if(now > _last_matrix_update_time + RS_GROUTER_MATRIX_UPDATE_PERIOD)
{
RsStackMutex mtx(grMtx) ;
_last_matrix_update_time = now ;
_routing_matrix.updateRoutingProbabilities() ;
_routing_matrix.updateRoutingProbabilities() ; // This should be locked.
}
#ifdef GROUTER_DEBUG
@ -289,10 +291,10 @@ void p3GRouter::autoWash()
time_t now = time(NULL) ;
for(std::map<GRouterMsgPropagationId,GRouterRoutingInfo>::iterator it(_pending_messages.begin());it!=_pending_messages.end();)
if(it->second.last_activity + GROUTER_ITEM_MAX_CACHE_KEEP_TIME < now)
if(it->second.received_time + GROUTER_ITEM_MAX_CACHE_KEEP_TIME < now) // is the item too old for cache
{
#ifdef GROUTER_DEBUG
std::cerr << " Removing cache item " << std::hex << it->first << std::dec << " for key id " << it->second.data_item->destination_key << std::endl;
std::cerr << " Removing cache item " << std::hex << it->first << std::dec << std::endl;
#endif
delete it->second.data_item ;
std::map<GRouterMsgPropagationId,GRouterRoutingInfo>::iterator tmp(it) ;
@ -300,6 +302,14 @@ void p3GRouter::autoWash()
_pending_messages.erase(it) ;
it = tmp ;
}
else if(it->second.data_item != NULL && it->second.status_flags == RS_GROUTER_ROUTING_STATE_SENT && it->second.last_sent+RS_GROUTER_ROUTING_WAITING_TIME < now)
{
it->second.status_flags = RS_GROUTER_ROUTING_STATE_PEND ;
#ifdef GROUTER_DEBUG
std::cerr << " Scheduling the item " << std::hex << it->first << std::dec << " for sending again." << std::endl;
#endif
++it ;
}
else
++it ;
@ -325,35 +335,56 @@ void p3GRouter::routePendingObjects()
mServiceControl->getPeersConnected(RS_SERVICE_TYPE_GROUTER,lst) ;
RsPeerId own_id( mServiceControl->getOwnId() );
std::vector<RsPeerId> pids ;
for(std::set<RsPeerId>::const_iterator it(lst.begin());it!=lst.end();++it)
pids.push_back(*it) ;
// The policy is the following:
//
// - all pending messages should be handled. A msg is pending when it is waiting for routage.
// A pending message should always have a non NULL data item attached.
//
for(std::map<GRouterMsgPropagationId, GRouterRoutingInfo>::iterator it(_pending_messages.begin());it!=_pending_messages.end();)
if((it->second.status_flags == RS_GROUTER_ROUTING_STATE_PEND) || (it->second.status_flags == RS_GROUTER_ROUTING_STATE_SENT && it->second.tried_friends.front().time_stamp+RS_GROUTER_ROUTING_WAITING_TIME < now))
if(it->second.status_flags == RS_GROUTER_ROUTING_STATE_PEND)
{
// make sure we have data to send.
//
if(it->second.data_item == NULL)
{
std::cerr << " (EE) Pending item has no data!!" << std::endl;
++it ;
continue ;
}
#ifdef GROUTER_DEBUG
std::cerr << " Msg id: " << std::hex << it->first << std::dec << std::endl;
std::cerr << " Origin: " << it->second.origin.toStdString() << std::endl;
if(!it->second.tried_friends.empty())
{
std::cerr << " Last : " << it->second.tried_friends.front().friend_id.toStdString() << std::endl;
std::cerr << " R Time: " << it->second.tried_friends.front().time_stamp << std::endl;
std::cerr << " S Time: " << it->second.tried_friends.front().time_stamp << std::endl;
}
std::cerr << " Recvd : " << it->second.received_time << std::endl;
std::cerr << " Last M: " << it->second.last_activity << std::endl;
std::cerr << " Recvd : " << now - it->second.received_time << " secs ago." << std::endl;
std::cerr << " Sent : " << now - it->second.last_sent << " secs ago." << std::endl;
std::cerr << " Flags : " << it->second.status_flags << std::endl;
std::cerr << " Dist : " << it->second.data_item->randomized_distance<< std::endl;
std::cerr << " Probabilities: " << std::endl;
#endif
std::vector<RsPeerId> pids ;
for(std::set<RsPeerId>::const_iterator its(lst.begin());its!=lst.end();++its)
if(*its != it->second.origin)
pids.push_back(*its) ;
if(pids.empty()) // no friends to send to!! Send back a give up signal.
{
sendACK(it->second.origin,it->first,RS_GROUTER_ACK_STATE_GVNP) ;
it->second.status_flags = RS_GROUTER_ROUTING_STATE_DEAD ;
delete it->second.data_item ;
it->second.data_item = NULL ;
++it ;
continue ;
}
std::vector<float> probas ; // friends probabilities for online friend list.
RsPeerId routed_friend ; // friend chosen for the next hop
bool should_remove = false ; // should we remove this from the map?
// Retrieve probabilities for this key. This call always succeeds. If no route is known, all probabilities become equal.
//
_routing_matrix.computeRoutingProbabilities(it->second.data_item->destination_key, pids, probas) ;
_routing_matrix.computeRoutingProbabilities(it->second.destination_key, pids, probas) ;
// Compute the maximum branching factor.
@ -386,7 +417,6 @@ void p3GRouter::routePendingObjects()
ftr.nb_friends = probas.size() ;
it->second.tried_friends.push_front(ftr) ;
it->second.status_flags = RS_GROUTER_ROUTING_STATE_SENT ;
#ifdef GROUTER_DEBUG
std::cerr << " Routing probability: " << ftr.probability << std::endl;
@ -400,28 +430,13 @@ void p3GRouter::routePendingObjects()
sendItem(new_item) ;
}
if(should_remove)
{
// We remove from the map. That means the RsItem* has been transfered to somewhere else.
//
#ifdef GROUTER_DEBUG
std::cerr << " Removing item from pending items" << std::endl;
#endif
std::map<GRouterMsgPropagationId, GRouterRoutingInfo>::iterator tmp(it) ;
delete it->second.data_item ;
++tmp ;
_pending_messages.erase(it) ;
it = tmp ;
}
else
++it ;
it->second.status_flags = RS_GROUTER_ROUTING_STATE_SENT ;
it->second.last_sent = now ;
}
else
{
#ifdef GROUTER_DEBUG
std::cerr << "Skipping " << std::hex << it->first << std::dec << ", dest=" << it->second.data_item->destination_key.toStdString() << ", state = " << it->second.status_flags ;
std::cerr << "Skipping " << std::hex << it->first << std::dec << ", state = " << it->second.status_flags ;
if(!it->second.tried_friends.empty())
std::cerr << ", stamp=" << it->second.tried_friends.front().time_stamp << " - " << it->second.tried_friends.front().friend_id.toStdString() << std::endl;
else
@ -467,21 +482,23 @@ uint32_t p3GRouter::computeBranchingFactor(const std::vector<RsPeerId>& friends,
uint32_t dist_index = std::min( (uint32_t)(dist / (float)GROUTER_ITEM_DISTANCE_UNIT), MAX_DIST_INDEX-1) ;
return std::max(2, (int)(friends.size()*branching_factors[dist_index])) ;
}
//// Now temper the branching factor by how likely we are to already have a good guess from the probabilities:
//// - if the largest probability is much larger than the second one
//
//std::vector<float> probs(probas) ;
//std::sort(probs.begin(),probs.end()) ;
//int n=0 ;
//for(int i=probs.size()-1;i>=0;--i)
// if(probs[i] > 0.5 * probs.back())
// ++n ;
//// send the final value
//return std::max(1, std::min(n, (int)(friends.size()*branching_factors[dist_index]))) ;
float p3GRouter::computeMatrixContribution(float base,uint32_t time_shift,float probability)
{
// This function computes the contribution to the routing matrix for an ACK that was
// received. The different variables are:
// base : base contribution. 1.0 for directly received items, 0.5 for indirectly received items.
// time_shift : time in seconds between when the item was sent and when the item was ACKed. This is a clue of
// how far the destination is, and is used to favor fast routes.
// probability : probability with which the item was sent. This should be used for importance-sampling the resulting weight
if(probability == 0.0f)
{
std::cerr << "Probability is NULL !!!!! This should not happen." << std::endl;
return 0.0f ;
}
return base * exp(-float(time_shift) / float(RS_GROUTER_MEAN_EXPECTED_RTT)) / probability ;
}
class peer_comparison_function
@ -604,7 +621,6 @@ std::set<uint32_t> p3GRouter::computeRoutingFriends_old(const std::vector<RsPeer
//
return res ;
}
#endif
void p3GRouter::locked_forwardKey(const RsGRouterPublishKeyItem& item)
{
@ -636,6 +652,8 @@ void p3GRouter::locked_forwardKey(const RsGRouterPublishKeyItem& item)
std::cerr << " Not forwarding to source id " << item.PeerId() << std::endl;
#endif
}
#endif
bool p3GRouter::registerKey(const GRouterKeyId& key,const GRouterServiceId& client_id,const std::string& description)
{
RsStackMutex mtx(grMtx) ;
@ -761,8 +779,11 @@ void p3GRouter::handleRecvACKItem(RsGRouterACKItem *item)
#endif
return ;
}
uint32_t next_state ;
uint32_t forward_state = RS_GROUTER_ACK_STATE_UNKNOWN ;
uint32_t next_state = it->second.status_flags;
uint32_t forward_state = RS_GROUTER_ACK_STATE_UNKN ;
bool update_routing_matrix = false ;
time_t now = time(NULL) ;
switch(item->state)
{
@ -779,26 +800,11 @@ void p3GRouter::handleRecvACKItem(RsGRouterACKItem *item)
next_state = RS_GROUTER_ROUTING_STATE_ARVD ;
{
#warning UNFINISHED code.
update_routing_matrix = true ;
break ;
// Now compute the weight for that particular item. See with what probabilities it was chosen.
//
// The real formula should be:
// weight = w(ACK type) / probability
//
// ... where probability is the probability with whitch the item was sent in the first place.
//
// The time should also be set so that the routing clue has less importance.
//
float weight = (item->state == RS_GROUTER_ACK_STATE_RCVD)?1.0f : 0.5;
#ifdef GROUTER_DEBUG
std::cerr << " weight = " << weight << std::endl;
#endif
_routing_matrix.addRoutingClue(it->second.data_item->destination_key,item->PeerId(),weight) ;
}
case RS_GROUTER_ACK_STATE_GIVEN_UP: // route is bad. We forward back and update the routing matrix.
case RS_GROUTER_ACK_STATE_GVNP: // route is bad. We forward back and update the routing matrix.
break ;
}
@ -812,16 +818,46 @@ void p3GRouter::handleRecvACKItem(RsGRouterACKItem *item)
// Just decrement the list of tried friends
//
bool found = false ;
for(std::list<FriendTrialRecord>::iterator it2(it->second.tried_friends.begin());it2!=it->second.tried_friends.end();++it2)
if( (*it2).friend_id == item->PeerId())
{
if(update_routing_matrix)
{
// Now compute the weight for that particular item. See with what probabilities it was chosen.
//
// The real formula should be:
// weight = w(ACK type) / probability
//
// ... where probability is the probability with whitch the item was sent in the first place.
//
// The time should also be set so that the routing clue has less importance.
//
float base = (item->state == RS_GROUTER_ACK_STATE_RCVD)?1.0f : 0.5 ;
uint32_t time_shift = now - (*it2).time_stamp ;
float probability = (*it2).probability;
float weight = computeMatrixContribution(base,time_shift,probability) ;
#ifdef GROUTER_DEBUG
std::cerr << " base contrib = " << base << std::endl;
std::cerr << " time shift = " << time_shift << std::endl;
std::cerr << " sendind proba = " << probability << std::endl;
std::cerr << " ==> final weight : " << weight << std::endl;
#endif
_routing_matrix.addRoutingClue(it->second.destination_key,item->PeerId(),weight) ;
}
#ifdef GROUTER_DEBUG
std::cerr << " Removing friend try for peer " << item->PeerId() << ". " << it->second.tried_friends.size() << " tries left." << std::endl;
#endif
it->second.tried_friends.erase(it2) ;
found = true ;
break ;
}
if(!found)
std::cerr << " (EE) friend try not found!! This should not happen. Needs debugging." << std::endl;
if(it->second.tried_friends.empty())
{
delete it->second.data_item ;
@ -838,7 +874,6 @@ void p3GRouter::handleRecvACKItem(RsGRouterACKItem *item)
forward_state = RS_GROUTER_ACK_STATE_GVNP ;
}
}
it->second.last_activity = time(NULL) ;
// Now send an ACK if necessary.
//
@ -849,13 +884,14 @@ void p3GRouter::handleRecvACKItem(RsGRouterACKItem *item)
std::cerr << "ACK triage phase ended. Next state = " << statusString[next_state] << ", forwarded ack=" << ackString[forward_state] << std::endl;
#endif
if(forward_state != RS_GROUTER_ACK_STATE_UNKNOWN && it->second.origin != mLinkMgr->getOwnId())
if(forward_state != RS_GROUTER_ACK_STATE_UNKN && it->second.origin != mLinkMgr->getOwnId())
{
#ifdef GROUTER_DEBUG
std::cerr << " forwarding ACK to origin: " << it->second.origin.toStdString() << std::endl;
#endif
sendACK(it->second.origin,item->mid,item->state) ;
}
it->second.status_flags = next_state ;
}
void p3GRouter::handleRecvDataItem(RsGRouterGenericDataItem *item)
@ -929,9 +965,10 @@ void p3GRouter::handleRecvDataItem(RsGRouterGenericDataItem *item)
info.data_item = item->duplicate() ;
item_copy = info.data_item ;
info.origin = RsPeerId(item->PeerId()) ;
info.origin = item->PeerId() ;
info.received_time = time(NULL) ;
info.last_activity = info.received_time ;
info.last_sent = 0 ;
info.destination_key = item->destination_key ;
info.status_flags = RS_GROUTER_ROUTING_STATE_PEND ;
_pending_messages[item->routing_id] = info ;
@ -981,7 +1018,6 @@ void p3GRouter::handleRecvDataItem(RsGRouterGenericDataItem *item)
if(returned_ack != RS_GROUTER_ACK_STATE_UNKN)
sendACK(item->PeerId(),item->routing_id,returned_ack) ;
itr->second.last_activity = now ;
_changed = true ;
}
@ -1003,10 +1039,11 @@ void p3GRouter::sendData(const GRouterKeyId& destination, RsGRouterGenericDataIt
info.data_item = item ;
info.status_flags = RS_GROUTER_ROUTING_STATE_PEND ;
info.origin = RsPeerId(mLinkMgr->getOwnId()) ;
info.origin = mLinkMgr->getOwnId() ;
info.data_item->randomized_distance = 0 ;
info.last_activity = now ;
info.last_sent = 0 ;
info.received_time = now ;
info.destination_key = destination ;
// Make sure we have a unique id (at least locally).
//
@ -1019,8 +1056,8 @@ void p3GRouter::sendData(const GRouterKeyId& destination, RsGRouterGenericDataIt
#ifdef GROUTER_DEBUG
std::cerr << "p3GRouter::sendGRouterData(): pushing the followign item in the msg pending list:" << std::endl;
std::cerr << " data_item.size = " << info.data_item->data_size << std::endl;
std::cerr << " data_item.byte = " << info.data_item->data_bytes << std::endl;
std::cerr << " destination = " << info.data_item->destination_key << std::endl;
std::cerr << " data_item.byte = " << RsDirUtil::sha1sum(info.data_item->data_bytes,info.data_item->data_size) << std::endl;
std::cerr << " destination = " << info.destination_key << std::endl;
std::cerr << " status = " << info.status_flags << std::endl;
std::cerr << " distance = " << info.data_item->randomized_distance << std::endl;
std::cerr << " origin = " << info.origin.toStdString() << std::endl;
@ -1150,10 +1187,10 @@ bool p3GRouter::getRoutingCacheInfo(std::vector<GRouterRoutingCacheInfo>& infos)
cinfo.mid = it->first ;
cinfo.local_origin = it->second.origin ;
cinfo.destination = it->second.data_item->destination_key ;
cinfo.destination = it->second.destination_key ;
cinfo.time_stamp = it->second.received_time ;
cinfo.status = it->second.status_flags ;
cinfo.data_size = it->second.data_item->data_size ;
cinfo.data_size = (it->second.data_item==NULL)?0:(it->second.data_item->data_size) ;
}
return true ;
}
@ -1197,12 +1234,9 @@ void p3GRouter::debugDump()
for(std::map<GRouterMsgPropagationId, GRouterRoutingInfo>::iterator it(_pending_messages.begin());it!=_pending_messages.end();++it)
{
std::cerr << " Msg id: " << std::hex << it->first << std::dec
<< " Local Origin: " << it->second.origin.toStdString() ;
if(it->second.data_item != NULL)
std::cerr << " Destination: " << it->second.data_item->destination_key ;
if(!it->second.tried_friends.empty())
std::cerr << " Time : " << now - it->second.tried_friends.front().time_stamp << " secs ago.";
std::cerr << " Msg id: " << std::hex << it->first << std::dec << " Local Origin: " << it->second.origin.toStdString() ;
std::cerr << " Destination: " << it->second.destination_key ;
std::cerr << " Time : " << now - it->second.last_sent << " secs ago.";
std::cerr << " Status: " << statusString[it->second.status_flags] << std::endl;
}