improved grouter routing algorithm using network simulator

git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@7264 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
csoler 2014-04-11 21:56:10 +00:00
parent d1e526cdf7
commit dd72809f2d
6 changed files with 260 additions and 170 deletions

View File

@ -46,17 +46,27 @@ static const time_t RS_GROUTER_DEBUG_OUTPUT_PERIOD = 20 ; // Outpu
static const time_t RS_GROUTER_AUTOWASH_PERIOD = 60 ; // Autowash every minute. Not a costly operation. static const time_t RS_GROUTER_AUTOWASH_PERIOD = 60 ; // Autowash every minute. Not a costly operation.
//static const time_t RS_GROUTER_PUBLISH_CAMPAIGN_PERIOD = 10*60 ; // Check for key advertising every 10 minutes //static const time_t RS_GROUTER_PUBLISH_CAMPAIGN_PERIOD = 10*60 ; // Check for key advertising every 10 minutes
//static const time_t RS_GROUTER_PUBLISH_KEY_TIME_INTERVAL = 24*60*60 ; // Advertise each key once a day at most. //static const time_t RS_GROUTER_PUBLISH_KEY_TIME_INTERVAL = 24*60*60 ; // Advertise each key once a day at most.
static const time_t RS_GROUTER_PUBLISH_CAMPAIGN_PERIOD = 1 *60 ; // Check for key advertising every 10 minutes static const time_t RS_GROUTER_MATRIX_UPDATE_PERIOD = 1 *10 ; // Check for key advertising every 10 minutes
static const time_t RS_GROUTER_PUBLISH_KEY_TIME_INTERVAL = 2 *60 ; // Advertise each key once a day at most. static const time_t RS_GROUTER_PUBLISH_KEY_TIME_INTERVAL = 2 *60 ; // Advertise each key once a day at most.
static const time_t RS_GROUTER_ROUTING_WAITING_TIME = 3600 ; // time between two trial of sending a given message static const time_t RS_GROUTER_ROUTING_WAITING_TIME = 2 *60 ; // time between two trial of sending a given message
static const time_t RS_GROUTER_KEY_DIFFUSION_MAX_KEEP = 7200 ; // time to keep key diffusion items in cache, to avoid multiple diffusion. //static const time_t RS_GROUTER_ROUTING_WAITING_TIME = 3600 ; // time between two trial of sending a given message
static const uint32_t GROUTER_ITEM_DISTANCE_UNIT = 256 ; // One unit of distance between two peers static const uint32_t GROUTER_ITEM_DISTANCE_UNIT = 256 ; // One unit of distance between two peers
static const uint32_t GROUTER_ITEM_MAX_TRAVEL_DISTANCE = 16*256 ; // 16 distance units. That is a lot. static const uint32_t GROUTER_ITEM_MAX_TRAVEL_DISTANCE = 16*256 ; // 16 distance units. That is a lot.
static const uint32_t GROUTER_ITEM_MAX_CACHE_KEEP_TIME = 3600 ; // 16 distance units. That is a lot.
static const uint32_t RS_GROUTER_ROUTING_STATE_UNKN = 0x0000 ; // unknown. Unused. static const uint32_t RS_GROUTER_ROUTING_STATE_UNKN = 0x0000 ; // unknown. Unused.
static const uint32_t RS_GROUTER_ROUTING_STATE_PEND = 0x0001 ; // item is pending. Should be sent asap. static const uint32_t RS_GROUTER_ROUTING_STATE_PEND = 0x0001 ; // item is pending. Should be sent asap.
static const uint32_t RS_GROUTER_ROUTING_STATE_SENT = 0x0002 ; // item is sent. Waiting for answer static const uint32_t RS_GROUTER_ROUTING_STATE_SENT = 0x0002 ; // item is sent. Waiting for answer
static const uint32_t RS_GROUTER_ROUTING_STATE_ARVD = 0x0003 ; // item is at destination. The cache only holds it to avoid duplication. static const uint32_t RS_GROUTER_ROUTING_STATE_ARVD = 0x0003 ; // item is at destination. The cache only holds it to avoid duplication.
static const uint32_t RS_GROUTER_ROUTING_STATE_DEAD = 0x0004 ; // item is at a dead end.
static const uint32_t RS_GROUTER_ACK_STATE_UNKN = 0x0000 ; // unknown destination key
static const uint32_t RS_GROUTER_ACK_STATE_RCVD = 0x0001 ; // data was received, directly
static const uint32_t RS_GROUTER_ACK_STATE_IRCV = 0x0002 ; // data was received indirectly
static const uint32_t RS_GROUTER_ACK_STATE_GVNP = 0x0003 ; // data was given up. No route.
static const uint32_t RS_GROUTER_ACK_STATE_NORO = 0x0004 ; // data was given up. No route.
static const uint32_t RS_GROUTER_ACK_STATE_TOOF = 0x0005 ; // dropped because of distance (too far)
class FriendTrialRecord class FriendTrialRecord
{ {
@ -76,6 +86,7 @@ class GRouterRoutingInfo
uint32_t status_flags ; // pending, waiting, etc. uint32_t status_flags ; // pending, waiting, etc.
RsPeerId origin ; // which friend sent us that item RsPeerId origin ; // which friend sent us that item
time_t received_time ; // time at which the item was received time_t received_time ; // time at which the item was received
time_t last_activity ; // time at which the item was received
std::list<FriendTrialRecord> tried_friends ; // list of friends to which the item was sent ordered with time. std::list<FriendTrialRecord> tried_friends ; // list of friends to which the item was sent ordered with time.

View File

@ -205,9 +205,9 @@ p3GRouter::p3GRouter(p3ServiceControl *sc,p3LinkMgr *lm)
addSerialType(new RsGRouterSerialiser()) ; addSerialType(new RsGRouterSerialiser()) ;
_last_autowash_time = 0 ; _last_autowash_time = 0 ;
_last_publish_campaign_time = 0 ;
_last_debug_output_time = 0 ; _last_debug_output_time = 0 ;
_last_config_changed = 0 ; _last_config_changed = 0 ;
_last_matrix_update_time = 0 ;
_random_salt = RSRandom::random_u64() ; _random_salt = RSRandom::random_u64() ;
@ -231,14 +231,11 @@ int p3GRouter::tick()
// //
handleIncoming() ; handleIncoming() ;
// Advertise published keys // Update routing matrix
// //
if(now > _last_publish_campaign_time + RS_GROUTER_PUBLISH_CAMPAIGN_PERIOD) if(now > _last_matrix_update_time + RS_GROUTER_MATRIX_UPDATE_PERIOD)
{ {
_last_publish_campaign_time = now ; _last_matrix_update_time = now ;
//publishKeys() ; // we don't publish keys anymore.
//
_routing_matrix.updateRoutingProbabilities() ; _routing_matrix.updateRoutingProbabilities() ;
} }
@ -291,15 +288,16 @@ void p3GRouter::autoWash()
time_t now = time(NULL) ; time_t now = time(NULL) ;
for(std::map<GRouterKeyPropagationId,time_t>::iterator it(_key_diffusion_time_stamps.begin());it!=_key_diffusion_time_stamps.end();) for(std::map<GRouterMsgPropagationId,GRouterRoutingInfo>::iterator it(_pending_messages.begin());it!=_pending_messages.end();)
if(it->second + RS_GROUTER_KEY_DIFFUSION_MAX_KEEP < now) if(it->second.last_activity + GROUTER_ITEM_MAX_CACHE_KEEP_TIME < now)
{ {
#ifdef GROUTER_DEBUG #ifdef GROUTER_DEBUG
std::cerr << " Removing key diffusion time stamp " << it->second << " for diffusion id " << std::hex << it->first << std::dec << std::endl; std::cerr << " Removing cache item " << std::hex << it->first << std::dec << " for key id " << it->second.data_item->destination_key << std::endl;
#endif #endif
std::map<GRouterKeyPropagationId,time_t>::iterator tmp(it) ; delete it->second.data_item ;
std::map<GRouterMsgPropagationId,GRouterRoutingInfo>::iterator tmp(it) ;
++tmp ; ++tmp ;
_key_diffusion_time_stamps.erase(it) ; _pending_messages.erase(it) ;
it = tmp ; it = tmp ;
} }
else else
@ -308,7 +306,6 @@ void p3GRouter::autoWash()
// look into pending items. // look into pending items.
#ifdef GROUTER_DEBUG #ifdef GROUTER_DEBUG
std::cerr << " Pending key diffusion items: " << _key_diffusion_items.size() << std::endl;
std::cerr << " Pending messages to route : " << _pending_messages.size() << std::endl; std::cerr << " Pending messages to route : " << _pending_messages.size() << std::endl;
#endif #endif
} }
@ -338,8 +335,13 @@ void p3GRouter::routePendingObjects()
#ifdef GROUTER_DEBUG #ifdef GROUTER_DEBUG
std::cerr << " Msg id: " << std::hex << it->first << std::dec << std::endl; std::cerr << " Msg id: " << std::hex << it->first << std::dec << std::endl;
std::cerr << " Origin: " << it->second.origin.toStdString() << std::endl; std::cerr << " Origin: " << it->second.origin.toStdString() << std::endl;
if(!it->second.tried_friends.empty())
{
std::cerr << " Last : " << it->second.tried_friends.front().friend_id.toStdString() << std::endl; std::cerr << " Last : " << it->second.tried_friends.front().friend_id.toStdString() << std::endl;
std::cerr << " Time : " << it->second.tried_friends.front().time_stamp << std::endl; std::cerr << " R Time: " << it->second.tried_friends.front().time_stamp << std::endl;
}
std::cerr << " Recvd : " << it->second.received_time << std::endl;
std::cerr << " Last M: " << it->second.last_activity << std::endl;
std::cerr << " Flags : " << it->second.status_flags << std::endl; std::cerr << " Flags : " << it->second.status_flags << std::endl;
std::cerr << " Dist : " << it->second.data_item->randomized_distance<< std::endl; std::cerr << " Dist : " << it->second.data_item->randomized_distance<< std::endl;
std::cerr << " Probabilities: " << std::endl; std::cerr << " Probabilities: " << std::endl;
@ -355,7 +357,7 @@ void p3GRouter::routePendingObjects()
// Compute the maximum branching factor. // Compute the maximum branching factor.
int N = computeBranchingFactor(pids,probas,it->second.data_item->randomized_distance) ; int N = computeBranchingFactor(pids,it->second.data_item->randomized_distance) ;
// Now use this to select N random peers according to the given probabilities // Now use this to select N random peers according to the given probabilities
@ -419,7 +421,11 @@ void p3GRouter::routePendingObjects()
else else
{ {
#ifdef GROUTER_DEBUG #ifdef GROUTER_DEBUG
std::cerr << "Skipping " << std::hex << it->first << std::dec << ", dest=" << it->second.data_item->destination_key.toStdString() << ", state = " << it->second.status_flags << ", stamp=" << it->second.tried_friends.front().time_stamp << " - " << it->second.tried_friends.front().friend_id.toStdString() << std::endl; std::cerr << "Skipping " << std::hex << it->first << std::dec << ", dest=" << it->second.data_item->destination_key.toStdString() << ", state = " << it->second.status_flags ;
if(!it->second.tried_friends.empty())
std::cerr << ", stamp=" << it->second.tried_friends.front().time_stamp << " - " << it->second.tried_friends.front().friend_id.toStdString() << std::endl;
else
std::cerr << std::endl;
#endif #endif
++it ; ++it ;
} }
@ -443,7 +449,7 @@ uint32_t p3GRouter::computeRandomDistanceIncrement(const RsPeerId& pid,const GRo
return RsDirUtil::sha1sum(tmpmem,total_size).toByteArray()[5] ; return RsDirUtil::sha1sum(tmpmem,total_size).toByteArray()[5] ;
} }
uint32_t p3GRouter::computeBranchingFactor(const std::vector<RsPeerId>& friends,const std::vector<float>& probas,uint32_t dist) uint32_t p3GRouter::computeBranchingFactor(const std::vector<RsPeerId>& friends,uint32_t dist)
{ {
// The branching factor N should ensure that messages have a constant probability of getting to destination. // The branching factor N should ensure that messages have a constant probability of getting to destination.
// What we're computing here is the maximum branching factor. Depending on the routing probabilities, // What we're computing here is the maximum branching factor. Depending on the routing probabilities,
@ -460,20 +466,22 @@ uint32_t p3GRouter::computeBranchingFactor(const std::vector<RsPeerId>& friends,
uint32_t dist_index = std::min( (uint32_t)(dist / (float)GROUTER_ITEM_DISTANCE_UNIT), MAX_DIST_INDEX-1) ; uint32_t dist_index = std::min( (uint32_t)(dist / (float)GROUTER_ITEM_DISTANCE_UNIT), MAX_DIST_INDEX-1) ;
// Now temper the branching factor by how likely we are to already have a good guess from the probabilities: return std::max(2, (int)(friends.size()*branching_factors[dist_index])) ;
// - if the largest probability is much larger than the second one
std::vector<float> probs(probas) ; //// Now temper the branching factor by how likely we are to already have a good guess from the probabilities:
std::sort(probs.begin(),probs.end()) ; //// - if the largest probability is much larger than the second one
int n=0 ; //
//std::vector<float> probs(probas) ;
//std::sort(probs.begin(),probs.end()) ;
//int n=0 ;
for(int i=probs.size()-1;i>=0;--i) //for(int i=probs.size()-1;i>=0;--i)
if(probs[i] > 0.5 * probs.back()) // if(probs[i] > 0.5 * probs.back())
++n ; // ++n ;
// send the final value //// send the final value
return std::max(1, std::min(n, (int)(friends.size()*branching_factors[dist_index]))) ; //return std::max(1, std::min(n, (int)(friends.size()*branching_factors[dist_index]))) ;
} }
class peer_comparison_function class peer_comparison_function
@ -488,6 +496,49 @@ std::set<uint32_t> p3GRouter::computeRoutingFriends(const std::vector<RsPeerId>&
{ {
std::set<uint32_t> res ; std::set<uint32_t> res ;
if(pids.size() != probas.size())
{
std::cerr << __PRETTY_FUNCTION__ << ": ERROR!! pids and probas should have the same size! Returning 0 friends!" << std::endl;
return res ;
}
#ifdef GROUTER_DEBUG
std::cerr << " Computing routing friends. Probabilities are: " << std::endl;
for(uint32_t j=0;j<probas.size();++j)
std::cerr << " " << j << " (" << pids[j] << ") : " << probas[j]<< std::endl;
#endif
// We draw N friends according to the routing probabilitites that are passed as parameter,
// removing duplicates. This has the nice property to randomly select new friends to
// try, but based on how unlikely they are to be correct.
//
// Doesn't need to randomise probabilitites, and allows tto compute a sensible importance sampling
// value to be used when correcting the trajectory.
//
for(uint32_t i=0;i<N;++i)
{
int p = probas.size() ;
// randomly select one peer between 0 and p
float total = 0.0f ; for(uint32_t j=0;j<probas.size();++j) total += probas[j] ; // computes the partial sum of the array
float r = RSRandom::random_f32()*total ;
int k=0; total=probas[0] ; while(total<r) total += probas[++k];
std::cerr << " => Friend " << i << ", between 0 and " << p-1 << ": chose k=" << k << ", peer=" << pids[k] << " with probability " << probas[k] << std::endl;
res.insert(k) ;
}
// We also add a totally random peer, for the sake of discovery new routes.
//
return res ;
}
#ifdef TO_BE_REMOVE
std::set<uint32_t> p3GRouter::computeRoutingFriends_old(const std::vector<RsPeerId>& pids,const std::vector<float>& probas,uint32_t N)
{
std::set<uint32_t> res ;
if(pids.size() != probas.size()) if(pids.size() != probas.size())
{ {
std::cerr << __PRETTY_FUNCTION__ << ": ERROR!! pids and probas should have the same size! Returning 0 friends!" << std::endl; std::cerr << __PRETTY_FUNCTION__ << ": ERROR!! pids and probas should have the same size! Returning 0 friends!" << std::endl;
@ -553,50 +604,8 @@ std::set<uint32_t> p3GRouter::computeRoutingFriends(const std::vector<RsPeerId>&
// //
return res ; return res ;
} }
void p3GRouter::publishKeys()
{
#ifdef SUSPENDED
RsStackMutex mtx(grMtx) ;
// Go through list of published keys
// broadcast a publishKeyItem for each of them.
time_t now = time(NULL) ;
for(std::map<GRouterKeyId, GRouterPublishedKeyInfo>::iterator it(_owned_key_ids.begin());it!=_owned_key_ids.end();++it)
{
GRouterPublishedKeyInfo& info(it->second) ;
if(now > info.last_published_time + RS_GROUTER_PUBLISH_KEY_TIME_INTERVAL)
{
// publish this key
#ifdef GROUTER_DEBUG
std::cerr << "Publishing this key: " << std::endl;
std::cerr << " Key id : " << it->first.toStdString() << std::endl;
std::cerr << " Service id : " << std::hex << info.service_id << std::dec << std::endl;
std::cerr << " Description : " << info.description_string << std::endl;
#endif #endif
RsGRouterPublishKeyItem item ;
item.diffusion_id = RSRandom::random_u32() ;
item.published_key = it->first ;
item.service_id = info.service_id ;
item.randomized_distance = drand48() ;
item.fingerprint.clear() ; // not set
item.description_string = info.description_string ;
item.PeerId(RsPeerId()) ; // no peer id => key is forwarded to all friends.
locked_forwardKey(item) ;
info.last_published_time = now ;
}
}
#endif
}
void p3GRouter::locked_forwardKey(const RsGRouterPublishKeyItem& item) void p3GRouter::locked_forwardKey(const RsGRouterPublishKeyItem& item)
{ {
std::set<RsPeerId> connected_peers ; std::set<RsPeerId> connected_peers ;
@ -640,7 +649,6 @@ bool p3GRouter::registerKey(const GRouterKeyId& key,const GRouterServiceId& clie
GRouterPublishedKeyInfo info ; GRouterPublishedKeyInfo info ;
info.service_id = client_id ; info.service_id = client_id ;
info.description_string = description.substr(0,20); info.description_string = description.substr(0,20);
//info.last_published_time = 0 ; // means never published, se it will be re-published soon.
_owned_key_ids[key] = info ; _owned_key_ids[key] = info ;
#ifdef GROUTER_DEBUG #ifdef GROUTER_DEBUG
@ -684,9 +692,6 @@ void p3GRouter::handleIncoming()
{ {
switch(item->PacketSubType()) switch(item->PacketSubType())
{ {
case RS_PKT_SUBTYPE_GROUTER_PUBLISH_KEY: handleRecvPublishKeyItem(dynamic_cast<RsGRouterPublishKeyItem*>(item)) ;
break ;
case RS_PKT_SUBTYPE_GROUTER_DATA: handleRecvDataItem(dynamic_cast<RsGRouterGenericDataItem*>(item)) ; case RS_PKT_SUBTYPE_GROUTER_DATA: handleRecvDataItem(dynamic_cast<RsGRouterGenericDataItem*>(item)) ;
break ; break ;
@ -699,47 +704,6 @@ void p3GRouter::handleIncoming()
} }
} }
void p3GRouter::handleRecvPublishKeyItem(RsGRouterPublishKeyItem *item)
{
RsStackMutex mtx(grMtx) ;
#ifdef GROUTER_DEBUG
std::cerr << "Received key publish item for key :" << std::endl ;
std::cerr << " diffusion = " << std::hex << item->diffusion_id << std::dec << std::endl ;
std::cerr << " key id = " << item->published_key.toStdString() << std::endl ;
std::cerr << " service id = " << std::hex << item->service_id << std::dec << std::endl;
std::cerr << " distance = " << item->randomized_distance << std::endl;
std::cerr << " description= " << item->description_string << std::endl;
#endif
// update the route matrix
_routing_matrix.addRoutingClue(item->published_key,RsPeerId(item->PeerId()),1) ;
// forward the key to other peers according to key forwarding cache
std::map<GRouterKeyPropagationId,time_t>::iterator it = _key_diffusion_time_stamps.find(item->diffusion_id) ;
bool found = (it != _key_diffusion_time_stamps.end()); // found. We don't propagate further
_key_diffusion_time_stamps[item->diffusion_id] = time(NULL) ; // always stamp
if(found)
{
#ifdef GROUTER_DEBUG
std::cerr << " Key diffusion item already in cache. Not forwardign further." << std::endl;
#endif
return ;
}
// Propagate the item to all other online friends. We don't do this right now, but push items in a queue.
// Doing this we can control the amount of key propagation and avoid flooding.
locked_forwardKey(*item) ;
_changed = true ;
}
void p3GRouter::handleRecvACKItem(RsGRouterACKItem *item) void p3GRouter::handleRecvACKItem(RsGRouterACKItem *item)
{ {
RsStackMutex mtx(grMtx) ; RsStackMutex mtx(grMtx) ;
@ -749,13 +713,44 @@ void p3GRouter::handleRecvACKItem(RsGRouterACKItem *item)
// find the item in the pendign list, // find the item in the pendign list,
// - if not found, drop. // - if not found, drop.
//
// ...and act appropriately:
// - item was
// - if we're origin // - if we're origin
// notify the client service // notify the client service
// else // else
// remove item // remove item data
//
// Item states:
// ARVD : item was previously delivered and acknowledge
// PEND : item is not yet handled
// SENT : item has been sent. Awaiting response from peers.
//
// ACK types:
// IRCV : indirectly received
// RCVD : received
// GVNP : Given up (for various reasons, including timed out, no route, etc)
//
// Rules for ACK items:
//
// ACK type/state| Forward back? | New state | Update Matrix | Comment
// --------------+-----------------------+--------------------+--------------------+---------------------------------------------------
// RCVD | | | |
// ARVD | N/A | | |
// SENT | RCVD | ARVD | YES |
// IRCV | | | |
// ARVD | NO | ARVD | YES | Not forwarded because already frwded by same route
// SENT | RCVD | ARVD | YES |
// GVNP | | | |
// Last | YES (Nothing / GVNP)| DEAD | NO | Just decrease tried friends, and forward when all done.
// Not Last | NO | SENT | NO | Just decrease tried friends, and forward when all done.
//
// - always decrease tried friends, whatever the answer. A given friend should send back only one answer.
// * a good statistics is the number of un-answered friends still pending
// * when tried friends are empty, send back an ACK that is:
// - nothing if state = ARVD
// - GVNP if state = SENT
//
// - always keep the item in cache for as long as necessary, in order to avoid forwarding items indefinitely
//
// 1 - determine all state variables: incoming ACK type and current state
// //
std::map<GRouterMsgPropagationId,GRouterRoutingInfo>::iterator it(_pending_messages.find(item->mid)) ; std::map<GRouterMsgPropagationId,GRouterRoutingInfo>::iterator it(_pending_messages.find(item->mid)) ;
@ -766,41 +761,43 @@ void p3GRouter::handleRecvACKItem(RsGRouterACKItem *item)
#endif #endif
return ; return ;
} }
uint32_t next_state ;
uint32_t forward_state = RS_GROUTER_ACK_STATE_UNKNOWN ;
switch(item->state) switch(item->state)
{ {
case RS_GROUTER_ACK_STATE_RECEIVED_INDIRECTLY: case RS_GROUTER_ACK_STATE_IRCV:
case RS_GROUTER_ACK_STATE_RECEIVED: case RS_GROUTER_ACK_STATE_RCVD:
// Notify the origin. This is the main route and it was successful. // Notify the origin. This is the main route and it was successful.
#ifdef GROUTER_DEBUG #ifdef GROUTER_DEBUG
std::cerr << " updating routing matrix." << std::endl; std::cerr << " updating routing matrix." << std::endl;
#endif #endif
it->second.status_flags = RS_GROUTER_ROUTING_STATE_ARVD ; if(it->second.status_flags == RS_GROUTER_ROUTING_STATE_SENT)
forward_state = RS_GROUTER_ACK_STATE_RCVD ;
next_state = RS_GROUTER_ROUTING_STATE_ARVD ;
{ {
#warning UNFINISHED code. #warning UNFINISHED code.
// Now compute the weight for that particular item. See with what probabilities it was chosen. // Now compute the weight for that particular item. See with what probabilities it was chosen.
// //
float weight = (item->state == RS_GROUTER_ACK_STATE_RECEIVED)?1.0f : 0.5; // The real formula should be:
// weight = w(ACK type) / probability
//
// ... where probability is the probability with whitch the item was sent in the first place.
//
// The time should also be set so that the routing clue has less importance.
//
float weight = (item->state == RS_GROUTER_ACK_STATE_RCVD)?1.0f : 0.5;
#ifdef GROUTER_DEBUG #ifdef GROUTER_DEBUG
std::cerr << " weight = " << weight << std::endl; std::cerr << " weight = " << weight << std::endl;
#endif #endif
_routing_matrix.addRoutingClue(it->second.data_item->destination_key,item->PeerId(),weight) ; _routing_matrix.addRoutingClue(it->second.data_item->destination_key,item->PeerId(),weight) ;
} }
if(it->second.origin != mLinkMgr->getOwnId())
{
#ifdef GROUTER_DEBUG
std::cerr << " forwarding ACK to origin: " << it->second.origin.toStdString() << std::endl;
#endif
sendACK(it->second.origin,item->mid,item->state) ;
}
break ;
case RS_GROUTER_ACK_STATE_GIVEN_UP: // route is bad. We forward back and update the routing matrix. case RS_GROUTER_ACK_STATE_GIVEN_UP: // route is bad. We forward back and update the routing matrix.
break ; break ;
} }
@ -828,11 +825,36 @@ void p3GRouter::handleRecvACKItem(RsGRouterACKItem *item)
if(it->second.tried_friends.empty()) if(it->second.tried_friends.empty())
{ {
delete it->second.data_item ; delete it->second.data_item ;
_pending_messages.erase(it) ; it->second.data_item = NULL ;
// delete item, but keep the cache entry for a while.
#ifdef GROUTER_DEBUG #ifdef GROUTER_DEBUG
std::cerr << " No tries left. Removing item from pending list." << std::endl; std::cerr << " No tries left. Removing item from pending list." << std::endl;
#endif #endif
if(it->second.status_flags != RS_GROUTER_ROUTING_STATE_ARVD)
{
next_state = RS_GROUTER_ROUTING_STATE_DEAD ;
forward_state = RS_GROUTER_ACK_STATE_GVNP ;
}
}
it->second.last_activity = time(NULL) ;
// Now send an ACK if necessary.
//
#ifdef GROUTER_DEBUG
static const std::string statusString[5] = { "Unkn","Pend","Sent","Ackn","Dead" };
static const std::string ackString[6] = { "Unkn","Rcvd","Ircd","Gvnp","Noro","Toof" };
std::cerr << "ACK triage phase ended. Next state = " << statusString[next_state] << ", forwarded ack=" << ackString[forward_state] << std::endl;
#endif
if(forward_state != RS_GROUTER_ACK_STATE_UNKNOWN && it->second.origin != mLinkMgr->getOwnId())
{
#ifdef GROUTER_DEBUG
std::cerr << " forwarding ACK to origin: " << it->second.origin.toStdString() << std::endl;
#endif
sendACK(it->second.origin,item->mid,item->state) ;
} }
} }
@ -850,24 +872,50 @@ void p3GRouter::handleRecvDataItem(RsGRouterGenericDataItem *item)
#ifdef GROUTER_DEBUG #ifdef GROUTER_DEBUG
std::cerr << " Distance is too large: " << item->randomized_distance << " units. Item is dropped." << std::endl; std::cerr << " Distance is too large: " << item->randomized_distance << " units. Item is dropped." << std::endl;
#endif #endif
sendACK(item->PeerId(),item->routing_id,RS_GROUTER_ACK_STATE_TOO_FAR) ; sendACK(item->PeerId(),item->routing_id,RS_GROUTER_ACK_STATE_GVNP) ;
return ; return ;
} }
time_t now = time(NULL) ;
// Do we have this item in the cache already? // Do we have this item in the cache already?
// - if not, add in the pending items // - if not, add in the pending items
// - if yet. Ignore, or send ACK for shorter route. // - if yet. Ignore, or send ACK for shorter route.
// Multiple cases to handle for both the ACK that is sent back and the next state of the flags
// for current node, depending on whether the item is already here are not, and what is the
// current state of the item cache:
//
// | Not in cache | STATE_PEND | STATE_SENT | STATE_ARVD
// ------------------------+---------------------+-----------------------+--------------------+-------------------
// Acknowledgement | | | |
// Ours | ACK_RCVD | - | - | ACK_IRVD
// Not ours | - | - | - | ACK_IRVD
// | | | |
// Next state | | | |
// Ours | STATE_ARVD | STATE_PEND | STATE_SENT | STATE_ARVD
// Not ours | STATE_PEND | STATE_PEND | STATE_SENT | STATE_ARVD
// | | | |
//
// Item not already here => set to STATE_PEND
//
// N = don't send back any acknowledgement
// - = unrelevant
//
std::map<GRouterKeyId,GRouterPublishedKeyInfo>::const_iterator it = _owned_key_ids.find(item->destination_key) ; std::map<GRouterKeyId,GRouterPublishedKeyInfo>::const_iterator it = _owned_key_ids.find(item->destination_key) ;
std::map<GRouterMsgPropagationId,GRouterRoutingInfo>::iterator itr = _pending_messages.find(item->routing_id) ; std::map<GRouterMsgPropagationId,GRouterRoutingInfo>::iterator itr = _pending_messages.find(item->routing_id) ;
RsGRouterGenericDataItem *item_copy = NULL; RsGRouterGenericDataItem *item_copy = NULL;
uint32_t new_status_flags = RS_GROUTER_ROUTING_STATE_UNKN;
uint32_t returned_ack = RS_GROUTER_ACK_STATE_UNKN;
// Is the item known?
//
if(itr != _pending_messages.end()) if(itr != _pending_messages.end())
{ {
#ifdef GROUTER_DEBUG #ifdef GROUTER_DEBUG
std::cerr << " Item is already there. Nothing to do. Should we update the cache?" << std::endl; std::cerr << " Item is already there. Nothing to do. Should we update the cache?" << std::endl;
#endif #endif
item_copy = itr->second.data_item ; item_copy = itr->second.data_item ;
} }
else // item is not known. Store it into pending msgs. We make a copy, since the item will be deleted otherwise. else // item is not known. Store it into pending msgs. We make a copy, since the item will be deleted otherwise.
@ -883,9 +931,13 @@ void p3GRouter::handleRecvDataItem(RsGRouterGenericDataItem *item)
info.origin = RsPeerId(item->PeerId()) ; info.origin = RsPeerId(item->PeerId()) ;
info.received_time = time(NULL) ; info.received_time = time(NULL) ;
info.last_activity = info.received_time ;
info.status_flags = RS_GROUTER_ROUTING_STATE_PEND ;
_pending_messages[item->routing_id] = info ; _pending_messages[item->routing_id] = info ;
itr = _pending_messages.find(item->routing_id) ; itr = _pending_messages.find(item->routing_id) ;
new_status_flags = itr->second.status_flags ;
itr->second.received_time = now ;
} }
// Is the item for us? If so, find the client service and send the item back. // Is the item for us? If so, find the client service and send the item back.
@ -893,12 +945,14 @@ void p3GRouter::handleRecvDataItem(RsGRouterGenericDataItem *item)
if(it != _owned_key_ids.end()) if(it != _owned_key_ids.end())
{ {
if(itr->second.status_flags == RS_GROUTER_ROUTING_STATE_ARVD) if(itr->second.status_flags == RS_GROUTER_ROUTING_STATE_ARVD)
sendACK(item->PeerId(), item->routing_id, RS_GROUTER_ACK_STATE_RECEIVED_INDIRECTLY) ; returned_ack = RS_GROUTER_ACK_STATE_IRCV ;
else else
{ {
sendACK(item->PeerId(), item->routing_id, RS_GROUTER_ACK_STATE_RECEIVED) ; returned_ack = RS_GROUTER_ACK_STATE_RCVD ;
itr->second.status_flags = RS_GROUTER_ROUTING_STATE_ARVD ; new_status_flags = RS_GROUTER_ROUTING_STATE_ARVD ;
// notify the client service.
//
std::map<GRouterServiceId,GRouterClientService*>::const_iterator its = _registered_services.find(it->second.service_id) ; std::map<GRouterServiceId,GRouterClientService*>::const_iterator its = _registered_services.find(it->second.service_id) ;
if(its != _registered_services.end()) if(its != _registered_services.end())
@ -917,11 +971,17 @@ void p3GRouter::handleRecvDataItem(RsGRouterGenericDataItem *item)
else else
{ {
#ifdef GROUTER_DEBUG #ifdef GROUTER_DEBUG
std::cerr << " item is not for us. Storing in pending mode." << std::endl; std::cerr << " item is not for us. Storing in pending mode and not notifying nor ACKs." << std::endl;
#endif #endif
itr->second.status_flags = RS_GROUTER_ROUTING_STATE_PEND ;
} }
std::cerr << " after triage: status = " << new_status_flags << ", ack = " << returned_ack << std::endl;
if(new_status_flags != RS_GROUTER_ROUTING_STATE_UNKN) itr->second.status_flags = new_status_flags ;
if(returned_ack != RS_GROUTER_ACK_STATE_UNKN)
sendACK(item->PeerId(),item->routing_id,returned_ack) ;
itr->second.last_activity = now ;
_changed = true ; _changed = true ;
} }
@ -939,10 +999,14 @@ void p3GRouter::sendData(const GRouterKeyId& destination, RsGRouterGenericDataIt
// //
GRouterRoutingInfo info ; GRouterRoutingInfo info ;
time_t now = time(NULL) ;
info.data_item = item ; info.data_item = item ;
info.status_flags = RS_GROUTER_ROUTING_STATE_PEND ; info.status_flags = RS_GROUTER_ROUTING_STATE_PEND ;
info.origin = RsPeerId(mLinkMgr->getOwnId()) ; info.origin = RsPeerId(mLinkMgr->getOwnId()) ;
info.data_item->randomized_distance = 0 ; info.data_item->randomized_distance = 0 ;
info.last_activity = now ;
info.received_time = now ;
// Make sure we have a unique id (at least locally). // Make sure we have a unique id (at least locally).
// //
@ -1110,7 +1174,6 @@ void p3GRouter::debugDump()
std::cerr << " Key id : " << it->first.toStdString() << std::endl; std::cerr << " Key id : " << it->first.toStdString() << std::endl;
std::cerr << " Service id : " << std::hex << it->second.service_id << std::dec << std::endl; std::cerr << " Service id : " << std::hex << it->second.service_id << std::dec << std::endl;
std::cerr << " Description : " << it->second.description_string << std::endl; std::cerr << " Description : " << it->second.description_string << std::endl;
//std::cerr << " Last published: " << now - it->second.last_published_time << " secs ago" << std::endl;
} }
std::cerr << " Registered services: " << std::endl; std::cerr << " Registered services: " << std::endl;
@ -1118,6 +1181,7 @@ void p3GRouter::debugDump()
for(std::map<GRouterServiceId,GRouterClientService *>::const_iterator it(_registered_services.begin() );it!=_registered_services.end();++it) for(std::map<GRouterServiceId,GRouterClientService *>::const_iterator it(_registered_services.begin() );it!=_registered_services.end();++it)
std::cerr << " " << std::hex << it->first << " " << std::dec << (void*)it->second << std::endl; std::cerr << " " << std::hex << it->first << " " << std::dec << (void*)it->second << std::endl;
#ifdef TO_BE_REMOVE
std::cerr << " Key diffusion cache: " << std::endl; std::cerr << " Key diffusion cache: " << std::endl;
for(std::map<GRouterKeyPropagationId,time_t>::const_iterator it(_key_diffusion_time_stamps.begin() );it!=_key_diffusion_time_stamps.end();++it) for(std::map<GRouterKeyPropagationId,time_t>::const_iterator it(_key_diffusion_time_stamps.begin() );it!=_key_diffusion_time_stamps.end();++it)
@ -1125,17 +1189,22 @@ void p3GRouter::debugDump()
std::cerr << " Key diffusion items: " << std::endl; std::cerr << " Key diffusion items: " << std::endl;
std::cerr << " [Not shown yet] " << std::endl; std::cerr << " [Not shown yet] " << std::endl;
#endif
std::cerr << " Data items: " << std::endl; std::cerr << " Data items: " << std::endl;
static const std::string statusString[4] = { "Unkn","Pend","Sent","Ackn" }; static const std::string statusString[4] = { "Unkn","Pend","Sent","Ackn" };
for(std::map<GRouterMsgPropagationId, GRouterRoutingInfo>::iterator it(_pending_messages.begin());it!=_pending_messages.end();++it) for(std::map<GRouterMsgPropagationId, GRouterRoutingInfo>::iterator it(_pending_messages.begin());it!=_pending_messages.end();++it)
{
std::cerr << " Msg id: " << std::hex << it->first << std::dec std::cerr << " Msg id: " << std::hex << it->first << std::dec
<< " Local Origin: " << it->second.origin.toStdString() << " Local Origin: " << it->second.origin.toStdString() ;
<< " Destination: " << it->second.data_item->destination_key if(it->second.data_item != NULL)
<< " Time : " << now - it->second.tried_friends.front().time_stamp << " secs ago." std::cerr << " Destination: " << it->second.data_item->destination_key ;
<< " Status: " << statusString[it->second.status_flags] << std::endl; if(!it->second.tried_friends.empty())
std::cerr << " Time : " << now - it->second.tried_friends.front().time_stamp << " secs ago.";
std::cerr << " Status: " << statusString[it->second.status_flags] << std::endl;
}
// << " Last : " << it->second.tried_friends.front().friend_id.toStdString() << std::endl; // << " Last : " << it->second.tried_friends.front().friend_id.toStdString() << std::endl;
// << " Probabilities: " << std::endl; // << " Probabilities: " << std::endl;

View File

@ -149,13 +149,12 @@ class p3GRouter: public RsGRouter, public p3Service, public p3Config
void autoWash() ; void autoWash() ;
void routePendingObjects() ; void routePendingObjects() ;
void handleIncoming() ; void handleIncoming() ;
void publishKeys() ;
void debugDump() ; void debugDump() ;
void locked_forwardKey(const RsGRouterPublishKeyItem&) ; void locked_forwardKey(const RsGRouterPublishKeyItem&) ;
// utility functions // utility functions
// //
static uint32_t computeBranchingFactor(const std::vector<RsPeerId>& friends,const std::vector<float>& probas,uint32_t dist) ; static uint32_t computeBranchingFactor(const std::vector<RsPeerId>& friends,uint32_t dist) ;
static std::set<uint32_t> computeRoutingFriends(const std::vector<RsPeerId>& friends,const std::vector<float>& probas,uint32_t N) ; static std::set<uint32_t> computeRoutingFriends(const std::vector<RsPeerId>& friends,const std::vector<float>& probas,uint32_t N) ;
uint32_t computeRandomDistanceIncrement(const RsPeerId& pid,const GRouterKeyId& destination_id) ; uint32_t computeRandomDistanceIncrement(const RsPeerId& pid,const GRouterKeyId& destination_id) ;
@ -200,6 +199,7 @@ class p3GRouter: public RsGRouter, public p3Service, public p3Config
// //
std::map<GRouterKeyId, GRouterPublishedKeyInfo> _owned_key_ids ; std::map<GRouterKeyId, GRouterPublishedKeyInfo> _owned_key_ids ;
#ifdef TO_BE_REMOVED
// Key publish cache and buffers // Key publish cache and buffers
// Handles key publish items routes and forwarding info. // Handles key publish items routes and forwarding info.
// //
@ -213,6 +213,9 @@ class p3GRouter: public RsGRouter, public p3Service, public p3Config
// //
std::priority_queue<RsGRouterPublishKeyItem *> _key_diffusion_items ; std::priority_queue<RsGRouterPublishKeyItem *> _key_diffusion_items ;
void handleRecvPublishKeyItem(RsGRouterPublishKeyItem *item) ;
#endif
// Registered services. These are known to the different peers with a common id, // Registered services. These are known to the different peers with a common id,
// so it's important to keep consistency here. This map is volatile, and re-created at each startup of // so it's important to keep consistency here. This map is volatile, and re-created at each startup of
// the software, when newly created services register themselves. // the software, when newly created services register themselves.
@ -221,7 +224,6 @@ class p3GRouter: public RsGRouter, public p3Service, public p3Config
// Data handling ethods // Data handling ethods
// //
void handleRecvPublishKeyItem(RsGRouterPublishKeyItem *item) ;
void handleRecvDataItem(RsGRouterGenericDataItem *item); void handleRecvDataItem(RsGRouterGenericDataItem *item);
void handleRecvACKItem(RsGRouterACKItem *item); void handleRecvACKItem(RsGRouterACKItem *item);
@ -238,7 +240,7 @@ class p3GRouter: public RsGRouter, public p3Service, public p3Config
bool _changed ; bool _changed ;
time_t _last_autowash_time ; time_t _last_autowash_time ;
time_t _last_publish_campaign_time ; time_t _last_matrix_update_time ;
time_t _last_debug_output_time ; time_t _last_debug_output_time ;
time_t _last_config_changed ; time_t _last_config_changed ;

View File

@ -28,13 +28,13 @@ NetworkSimulatorGUI::NetworkSimulatorGUI(Network& net)
void NetworkSimulatorGUI::updateSelectedNode(int node_id) void NetworkSimulatorGUI::updateSelectedNode(int node_id)
{ {
_turtle_router_statistics->setTurtleRouter( _viewer->network().node(node_id).turtle_service() ) ; const RsTurtle *turtle = _viewer->network().node(node_id).turtle_service() ;
_global_router_statistics->setGlobalRouter( _viewer->network().node(node_id).global_router_service() ) ; const RsGRouter *grouter = _viewer->network().node(node_id).global_router_service() ;
_turtle_router_statistics->updateDisplay() ; _turtle_router_statistics->setTurtleRouter( turtle ) ;
_global_router_statistics->updateDisplay() ; _global_router_statistics->setGlobalRouter( grouter ) ;
std::cerr << "Selected objects: turtle=" << std::hex << _viewer->network().node(node_id).turtle_service() << ", grouter=" << _viewer->network().node(node_id).global_router_service() << std::dec << std::endl; std::cerr << "Selected objects: turtle=" << std::hex << turtle << ", grouter=" << grouter << std::dec << std::endl;
} }
void NetworkSimulatorGUI::toggleNetworkTraffic(bool b) void NetworkSimulatorGUI::toggleNetworkTraffic(bool b)

View File

@ -20,5 +20,7 @@ void MonitoredGRouterClient::sendMessage(const GRouterKeyId& destination_key_id)
item->data_size = 1000 + (RSRandom::random_u32()%1000) ; item->data_size = 1000 + (RSRandom::random_u32()%1000) ;
item->data_bytes = (unsigned char *)malloc(item->data_size) ; item->data_bytes = (unsigned char *)malloc(item->data_size) ;
RSRandom::random_bytes(item->data_bytes,item->data_size) ;
_grouter->sendData(destination_key_id,item) ; _grouter->sendData(destination_key_id,item) ;
} }

View File

@ -73,6 +73,8 @@ void Network::tick()
for(uint32_t i=0;i<n_nodes();++i) for(uint32_t i=0;i<n_nodes();++i)
node(i).tick() ; node(i).tick() ;
try
{
// Get items for each components and send them to their destination. // Get items for each components and send them to their destination.
// //
for(uint32_t i=0;i<n_nodes();++i) for(uint32_t i=0;i<n_nodes();++i)
@ -89,6 +91,10 @@ void Network::tick()
node.incoming(item) ; node.incoming(item) ;
} }
} }
}
catch(...)
{
}
} }
PeerNode& Network::node_by_id(const RsPeerId& id) PeerNode& Network::node_by_id(const RsPeerId& id)