rewrote the logic of grouting algorithm to work without key publication. Some constants still need to be tweaked and branching factors is not computed yet.

git-svn-id: http://svn.code.sf.net/p/retroshare/code/branches/v0.6-IdCleaning@7200 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
csoler 2014-03-22 22:28:55 +00:00
parent 0b812e636d
commit e5a6dde5b9
6 changed files with 164 additions and 126 deletions

View file

@ -50,6 +50,7 @@ const uint32_t RS_GROUTER_ACK_STATE_RECEIVED_INDIRECTLY = 0x0002 ; // data was
const uint32_t RS_GROUTER_ACK_STATE_GIVEN_UP = 0x0003 ; // data was given up. No route.
const uint32_t RS_GROUTER_ACK_STATE_NO_ROUTE = 0x0004 ; // data was given up. No route.
const uint32_t RS_GROUTER_ACK_STATE_UNKNOWN = 0x0005 ; // unknown destination key
const uint32_t RS_GROUTER_ACK_STATE_TOO_FAR = 0x0006 ; // dropped because of distance
/***********************************************************************************/
/* Basic GRouter Item Class */
@ -151,6 +152,7 @@ class RsGRouterGenericDataItem: public RsGRouterItem, public RsGRouterNonCopyabl
//
GRouterMsgPropagationId routing_id ;
GRouterKeyId destination_key ;
uint32_t randomized_distance ;
uint32_t data_size ;
uint8_t *data_bytes;

View file

@ -32,8 +32,7 @@ GRouterMatrix::GRouterMatrix()
_proba_need_updating = true ;
}
bool GRouterMatrix::addRoutingClue( const GRouterKeyId& key_id,const GRouterServiceId& sid,float distance,
const std::string& desc_string,const RsPeerId& source_friend)
bool GRouterMatrix::addRoutingClue(const GRouterKeyId& key_id,const RsPeerId& source_friend,float weight)
{
// 1 - get the friend index.
//
@ -44,7 +43,7 @@ bool GRouterMatrix::addRoutingClue( const GRouterKeyId& key_id,const GRouterServ
time_t now = time(NULL) ;
RoutingMatrixHitEntry rc ;
rc.weight = 1.0f / (1.0f + distance) ;
rc.weight = weight ;
rc.time_stamp = now ;
rc.friend_id = fid ;

View file

@ -60,7 +60,7 @@ class GRouterMatrix
// Record one routing clue. The events can possibly be merged in time buckets.
//
bool addRoutingClue(const GRouterKeyId& id,const GRouterServiceId& sid,float distance,const std::string& desc_string,const RsPeerId& source_friend) ;
bool addRoutingClue(const GRouterKeyId& id,const RsPeerId& source_friend,float weight) ;
// Dump info in terminal.
//

View file

@ -50,6 +50,8 @@ static const time_t RS_GROUTER_PUBLISH_CAMPAIGN_PERIOD = 1 *60 ; // Check
static const time_t RS_GROUTER_PUBLISH_KEY_TIME_INTERVAL = 2 *60 ; // Advertise each key once a day at most.
static const time_t RS_GROUTER_ROUTING_WAITING_TIME = 3600 ; // time between two trial of sending a given message
static const time_t RS_GROUTER_KEY_DIFFUSION_MAX_KEEP = 7200 ; // time to keep key diffusion items in cache, to avoid multiple diffusion.
static const uint32_t GROUTER_ITEM_DISTANCE_UNIT = 100 ; // One unit of distance between two peers
static const uint32_t GROUTER_ITEM_MAX_TRAVEL_DISTANCE = 10*100 ; // 10 distance units. That is a lot.
static const uint32_t RS_GROUTER_ROUTING_STATE_UNKN = 0x0000 ; // unknown. Unused.
static const uint32_t RS_GROUTER_ROUTING_STATE_PEND = 0x0001 ; // item is pending. Should be sent asap.
@ -70,6 +72,8 @@ struct FriendTrialRecord
{
RsPeerId friend_id ; // id of the friend
time_t time_stamp ; // time of the last tried
float probability ; // probability at which the item was selected
int nb_friends ; // number of friends at the time of sending the item
};
class GRouterRoutingInfo
@ -81,5 +85,6 @@ class GRouterRoutingInfo
std::list<FriendTrialRecord> tried_friends ; // list of friends to which the item was sent ordered with time.
RsPeerId origin ; // which friend sent us that item
time_t received_time ; // time at which the item was received
uint32_t randomized_distance ; // distance of travel, randomized at each hop.
};

View file

@ -69,6 +69,17 @@
// - which probability was used to chose this friend (will be useful
// to compute the routing contribution if the msg is ACK-ed)
//
// Two probabilities are computed:
// - routing probabilities among connected friends
// * this is computed by the routing matrix
// - branching factor N
// * depends on the depth of the items
// * depends on the distribution of probabilities (min and max)
//
// Once computed,
// - the item is forwarded randomly to N peers drawn from the list of connected peers with the given probabilities.
// - the depth of the item is incremented randomly
//
// - downward: look into routing cache. If info not present, drop the item.
// Forward item into stored direction.
//
@ -137,6 +148,7 @@
// - packet service ID (Can be messenging, channels, etc).
// - packet data (void* + size_t)
// - flags (such as ACK or response required, and packet direction)
// - routed directions and probabilities
// * ACK packet.
// - packet unique ID (the id of the corresponding data)
// - flags (reason for ACK. Could be data delivered, or error, too far, etc)
@ -285,39 +297,6 @@ void p3GRouter::routePendingObjects()
{
RsStackMutex mtx(grMtx) ;
// Go through list of published keys
// broadcast a publishKeyItem for each of them.
//
// The routing rules are the following:
//
// Go through list of cached routing objects. For each object:
// if(Last try is old)
// put the object in pending list
//
// (This loop is costly (could handle lots of items), so it should be done less often.)
//
// Add peer to the list of tried routes with time stamp
// Keep the list of tried friends
//
// Go through list of pendign objects. For each object:
// Select one route direction
// - according to current probabilities from the routing matrix
// - according to list of previous attempts
//
// if(route found)
// forward item, update state and time stamp
// else
// if(I am not the sender)
// send back ACK(given up) // if I am the sender, I will keep trying.
//
// Item has received an ACK
//
// ACK: given up => change route
// ACK: received => Remove item from routed items
//
// The list in _pending_messages is necessarily short and most of the time empty. Once
// treated, objects are stored in _routing_cache, where they wait for an answer.
time_t now = time(NULL) ;
std::cerr << "p3GRouter::routeObjects() triage phase:" << std::endl;
@ -339,54 +318,28 @@ void p3GRouter::routePendingObjects()
std::map<RsPeerId,float> probas ; // friends probabilities for online friend list.
RsPeerId routed_friend ; // friend chosen for the next hop
float best_proba = 0.0f; // temp variable used to select the best proba
bool should_remove = false ; // should we remove this from the map?
// retrieve probabilities for this key.
// Retrieve probabilities for this key. This call always succeeds. If no route is known, all probabilities become equal.
//
if(! _routing_matrix.computeRoutingProbabilities(it->second.data_item->destination_key, lst, probas))
_routing_matrix.computeRoutingProbabilities(it->second.data_item->destination_key, lst, probas) ;
// Compute the branching factor.
int N = computeBranchingFactor(probas,it->second.randomized_distance) ;
// Now use this to select N random peers according to the given probabilities
std::set<RsPeerId> routing_friends = computeRoutingFriends(probas,N) ;
std::cerr << " Routing statistics: " << std::endl;
// Actually send the item.
for(std::set<RsPeerId>::const_iterator its(routing_friends.begin());its!=routing_friends.end();++its)
{
// key does not exist in routing matrix => send back an ACK(unknown)
std::cerr << " Friend : " << (*its) << std::endl;
std::cerr << " [Cannot compute. Unknown destination key!!] " << std::endl;
if(it->second.origin != own_id)
{
std::cerr << " removing item and notifying the sender (" << it->second.origin.toStdString() << ")" << std::endl;
sendACK(it->second.origin,it->first,RS_GROUTER_ACK_STATE_UNKNOWN) ;
// remove item from cache
//
should_remove = true ;
}
std::cerr << " item is ours. Keeping it until a route is known." << std::endl;
// else, select a routing friend at random, or just wait? Wait is probably better.
}
bool friend_found = false ;
for(std::map<RsPeerId,float>::const_iterator it2(probas.begin());it2!=probas.end();++it2)
{
std::cerr << " " << it2->first.toStdString() << " : " << it2->second << std::endl;
// select the peer with highest probability that hasn't yet been tried.
if(it2->second > best_proba && !(it2->first == it->second.tried_friends.front().friend_id))
{
routed_friend = it2->first ;
best_proba = it2->second ;
friend_found = true ;
}
}
std::cerr << " Best route: " << routed_friend.toStdString() << ", with probability " << best_proba << std::endl;
// now, send the item.
if(friend_found)
{
// make a deep copy of the item
RsGRouterGenericDataItem *new_item = it->second.data_item->duplicate() ;
@ -394,26 +347,20 @@ void p3GRouter::routePendingObjects()
FriendTrialRecord ftr ;
ftr.time_stamp = now ;
ftr.friend_id = routed_friend ;
ftr.probability = probas[*its] ;
ftr.nb_friends = probas.size() ;
it->second.tried_friends.push_front(ftr) ;
it->second.status_flags = RS_GROUTER_ROUTING_STATE_SENT ;
std::cerr << " Routing probability: " << ftr.probability << std::endl;
std::cerr << " Sending..." << std::endl;
// send
new_item->PeerId(routed_friend) ;
new_item->PeerId(*its) ;
sendItem(new_item) ;
}
else if(it->second.origin != mLinkMgr->getOwnId() || std::find(lst.begin(),lst.end(),it->second.origin) != lst.end())
{
// There's no correct friend to send this item to. We keep it for a while. If it's too old,
// we discard it. For now, the procedure is to send back an ACK.
std::cerr << " Item has no route candidate. It's too old. " << std::endl;
std::cerr << " sending ACK(no route) to peer " << it->second.origin.toStdString() << std::endl;
sendACK(it->second.origin,it->first,RS_GROUTER_ACK_STATE_NO_ROUTE) ;
should_remove = true ;
}
if(should_remove)
{
@ -431,7 +378,32 @@ void p3GRouter::routePendingObjects()
++it ;
}
else
{
std::cerr << "Skipping " << std::hex << it->first << std::dec << ", dest=" << it->second.data_item->destination_key.toStdString() << ", state = " << it->second.status_flags << ", stamp=" << it->second.tried_friends.front().time_stamp << " - " << it->second.tried_friends.front().friend_id.toStdString() << std::endl;
++it ;
}
}
uint32_t p3GRouter::computeBranchingFactor(const std::map<RsPeerId,float>& probas,uint32_t dist)
{
// This should be made a bit more adaptive ;-)
//
return 1 ;
}
std::set<RsPeerId> p3GRouter::computeRoutingFriends(const std::map<RsPeerId,float>& probas,uint32_t N)
{
// We draw N friends according to the routing probabilitites that are passed as parameter.
//
std::set<RsPeerId> res ;
if(probas.empty())
return res ;
res.insert(probas.begin()->first) ;
return res ;
}
void p3GRouter::publishKeys()
@ -575,7 +547,7 @@ void p3GRouter::handleRecvPublishKeyItem(RsGRouterPublishKeyItem *item)
// update the route matrix
_routing_matrix.addRoutingClue(item->published_key,item->service_id,item->randomized_distance,item->description_string,RsPeerId(item->PeerId())) ;
_routing_matrix.addRoutingClue(item->published_key,RsPeerId(item->PeerId()),1) ;
// forward the key to other peers according to key forwarding cache
@ -602,7 +574,7 @@ void p3GRouter::handleRecvPublishKeyItem(RsGRouterPublishKeyItem *item)
void p3GRouter::handleRecvACKItem(RsGRouterACKItem *item)
{
RsStackMutex mtx(grMtx) ;
std::cerr << "Received ACK item, mid=" << std::hex << item->mid << std::dec << std::endl;
std::cerr << "Received ACK item, mid=" << std::hex << item->mid << std::dec << ", ACK type = "<< item->state << std::endl;
// find the item in the pendign list,
// - if not found, drop.
@ -624,15 +596,28 @@ void p3GRouter::handleRecvACKItem(RsGRouterACKItem *item)
switch(item->state)
{
case RS_GROUTER_ACK_STATE_RECEIVED_INDIRECTLY:
// Do nothing. It was indirectly received: fine. We don't need to notify the origin
// otherwise lots of ACKs will flow to the origin.
break ;
case RS_GROUTER_ACK_STATE_RECEIVED:
// Notify the origin. This is the main route and it was successful.
std::cerr << " updating routing matrix." << std::endl;
it->second.status_flags = RS_GROUTER_ROUTING_STATE_ARVD ;
{
#warning UNFINISHED code.
// Now compute the weight for that particular item. See with what probabilities it was chosen.
//
float weight = (item->state == RS_GROUTER_ACK_STATE_RECEIVED)?1.0f : 0.5;
std::cerr << " weight = " << weight << std::endl;
_routing_matrix.addRoutingClue(it->second.data_item->destination_key,item->PeerId(),weight) ;
}
std::cerr << " forwarding ACK to origin: " << it->second.origin.toStdString() << std::endl;
sendACK(it->second.origin,item->mid,RS_GROUTER_ACK_STATE_RECEIVED) ;
sendACK(it->second.origin,item->mid,item->state) ;
break ;
case RS_GROUTER_ACK_STATE_GIVEN_UP: // route is bad. We forward back and update the routing matrix.
@ -645,23 +630,45 @@ void p3GRouter::handleRecvACKItem(RsGRouterACKItem *item)
std::cerr << " We're owner: should notify client id" << std::endl;
}
// Always remove the item.
// Just decrement the list of tried friends
//
for(std::list<FriendTrialRecord>::iterator it2(it->second.tried_friends.begin());it2!=it->second.tried_friends.end();++it2)
if( (*it2).friend_id == item->PeerId())
{
std::cerr << " Removing friend try for peer " << item->PeerId() << ". " << it->second.tried_friends.size() << " tries left." << std::endl;
it->second.tried_friends.erase(it2) ;
break ;
}
if(it->second.tried_friends.empty())
{
delete it->second.data_item ;
_pending_messages.erase(it) ;
std::cerr << " No tries left. Removing item from pending list." << std::endl;
}
}
void p3GRouter::handleRecvDataItem(RsGRouterGenericDataItem *item)
{
RsStackMutex mtx(grMtx) ;
std::cerr << "Received data item for key " << item->destination_key << std::endl;
std::cerr << "Received data item for key " << item->destination_key << ", distance = " << item->randomized_distance << std::endl;
// check the item depth. If too large, send a ACK back.
if(item->randomized_distance > GROUTER_ITEM_MAX_TRAVEL_DISTANCE)
{
std::cerr << " Distance is too large: " << item->randomized_distance << " units. Item is dropped." << std::endl;
sendACK(item->PeerId(),item->routing_id,RS_GROUTER_ACK_STATE_TOO_FAR) ;
return ;
}
// Do we have this item in the cache already?
// - if not, add in the pending items
// - if yet. Ignore, or send ACK for shorter route.
std::map<GRouterKeyId,GRouterPublishedKeyInfo>::const_iterator it = _owned_key_ids.find(item->destination_key) ;
std::map<GRouterMsgPropagationId,GRouterRoutingInfo>::const_iterator itr = _pending_messages.find(item->routing_id) ;
std::map<GRouterMsgPropagationId,GRouterRoutingInfo>::iterator itr = _pending_messages.find(item->routing_id) ;
RsGRouterGenericDataItem *item_copy = NULL;
if(itr != _pending_messages.end())
@ -670,7 +677,7 @@ void p3GRouter::handleRecvDataItem(RsGRouterGenericDataItem *item)
item_copy = itr->second.data_item ;
}
else // item is now known. Store it into pending msgs. We make a copy, since the item will be deleted otherwise.
else // item is not known. Store it into pending msgs. We make a copy, since the item will be deleted otherwise.
{
std::cerr << " Item is new. Storing in cache as pending messages." << std::endl;
@ -679,23 +686,24 @@ void p3GRouter::handleRecvDataItem(RsGRouterGenericDataItem *item)
info.data_item = item->duplicate() ;
item_copy = info.data_item ;
if(it != _owned_key_ids.end())
info.status_flags = RS_GROUTER_ROUTING_STATE_ARVD ;
else
info.status_flags = RS_GROUTER_ROUTING_STATE_PEND ;
info.origin = RsPeerId(item->PeerId()) ;
info.received_time = time(NULL) ;
_pending_messages[item->routing_id] = info ;
itr = _pending_messages.find(item->routing_id) ;
}
// Is the item for us? If so, find the client service and send the item back.
//
if(it != _owned_key_ids.end())
if(time(NULL) < it->second.validity_time)
{
// test validity time. If too old, we don't forward.
if(itr->second.status_flags == RS_GROUTER_ROUTING_STATE_ARVD)
sendACK(item->PeerId(), item->routing_id, RS_GROUTER_ACK_STATE_RECEIVED_INDIRECTLY) ;
else
{
sendACK(item->PeerId(), item->routing_id, RS_GROUTER_ACK_STATE_RECEIVED) ;
itr->second.status_flags = RS_GROUTER_ROUTING_STATE_ARVD ;
std::map<GRouterServiceId,GRouterClientService*>::const_iterator its = _registered_services.find(it->second.service_id) ;
@ -707,10 +715,17 @@ void p3GRouter::handleRecvDataItem(RsGRouterGenericDataItem *item)
else
std::cerr << " (EE) weird situation. No service registered for a key that we own. Key id = " << item->destination_key.toStdString() << ", service id = " << it->second.service_id << std::endl;
}
}
else
std::cerr << " (WW) key is outdated. Dropping this item." << std::endl;
{
std::cerr << " key is outdated. The item will be dropped." << std::endl;
return ;
}
else
std::cerr << " Item is not for us. Leaving in pending msgs to be routed later." << std::endl;
{
std::cerr << " item is not for us. Storing in pending mode." << std::endl;
itr->second.status_flags = RS_GROUTER_ROUTING_STATE_PEND ;
}
_changed = true ;
}
@ -739,9 +754,13 @@ void p3GRouter::sendData(const GRouterKeyId& destination, RsGRouterGenericDataIt
GRouterMsgPropagationId propagation_id ;
do { propagation_id = RSRandom::random_u32(); } while(_pending_messages.find(propagation_id) != _pending_messages.end()) ;
item->destination_key = destination ;
item->routing_id = propagation_id ;
std::cerr << "p3GRouter::sendGRouterData(): pushing the followign item in the msg pending list:" << std::endl;
std::cerr << " data_item.size = " << info.data_item->data_size << std::endl;
std::cerr << " data_item.byte = " << info.data_item->data_bytes << std::endl;
std::cerr << " destination = " << info.data_item->destination_key << std::endl;
std::cerr << " status = " << info.status_flags << std::endl;
std::cerr << " origin = " << info.origin.toStdString() << std::endl;
std::cerr << " Recv time = " << info.received_time << std::endl;
@ -854,7 +873,17 @@ void p3GRouter::debugDump()
std::cerr << " [Not shown yet] " << std::endl;
std::cerr << " Data items: " << std::endl;
std::cerr << " [Not shown yet] " << std::endl;
static std::string statusString[4] = { "Unkn","Pend","Sent","Ackn" };
for(std::map<GRouterMsgPropagationId, GRouterRoutingInfo>::iterator it(_pending_messages.begin());it!=_pending_messages.end();++it)
std::cerr << " Msg id: " << std::hex << it->first << std::dec
<< " Origin: " << it->second.origin.toStdString()
<< " Time : " << now - it->second.tried_friends.front().time_stamp << " secs ago."
<< " Status: " << statusString[it->second.status_flags] << std::endl;
// << " Last : " << it->second.tried_friends.front().friend_id.toStdString() << std::endl;
// << " Probabilities: " << std::endl;
std::cerr << " Routing matrix: " << std::endl;

View file

@ -133,6 +133,9 @@ class p3GRouter: public RsGRouter, public p3Service, public p3Config
void debugDump() ;
void locked_forwardKey(const RsGRouterPublishKeyItem&) ;
static uint32_t computeBranchingFactor(const std::map<RsPeerId,float>& probas,uint32_t dist) ;
static std::set<RsPeerId> computeRoutingFriends(const std::map<RsPeerId,float>& probas,uint32_t N) ;
//===================================================//
// p3Config methods //
//===================================================//