From e5a6dde5b91380b4591d42ceb82dbfb9a7045477 Mon Sep 17 00:00:00 2001 From: csoler Date: Sat, 22 Mar 2014 22:28:55 +0000 Subject: [PATCH] rewrote the logic of grouting algorithm to work without key publication. Some constants still need to be tweaked and branching factors is not computed yet. git-svn-id: http://svn.code.sf.net/p/retroshare/code/branches/v0.6-IdCleaning@7200 b45a01b8-16f6-495d-af2f-9b41ad6348cc --- libretroshare/src/grouter/grouteritems.h | 2 + libretroshare/src/grouter/groutermatrix.cc | 5 +- libretroshare/src/grouter/groutermatrix.h | 2 +- libretroshare/src/grouter/groutertypes.h | 5 + libretroshare/src/grouter/p3grouter.cc | 273 ++++++++++++--------- libretroshare/src/grouter/p3grouter.h | 3 + 6 files changed, 164 insertions(+), 126 deletions(-) diff --git a/libretroshare/src/grouter/grouteritems.h b/libretroshare/src/grouter/grouteritems.h index 08aa511ec..38d768ee7 100644 --- a/libretroshare/src/grouter/grouteritems.h +++ b/libretroshare/src/grouter/grouteritems.h @@ -50,6 +50,7 @@ const uint32_t RS_GROUTER_ACK_STATE_RECEIVED_INDIRECTLY = 0x0002 ; // data was const uint32_t RS_GROUTER_ACK_STATE_GIVEN_UP = 0x0003 ; // data was given up. No route. const uint32_t RS_GROUTER_ACK_STATE_NO_ROUTE = 0x0004 ; // data was given up. No route. const uint32_t RS_GROUTER_ACK_STATE_UNKNOWN = 0x0005 ; // unknown destination key +const uint32_t RS_GROUTER_ACK_STATE_TOO_FAR = 0x0006 ; // dropped because of distance /***********************************************************************************/ /* Basic GRouter Item Class */ @@ -151,6 +152,7 @@ class RsGRouterGenericDataItem: public RsGRouterItem, public RsGRouterNonCopyabl // GRouterMsgPropagationId routing_id ; GRouterKeyId destination_key ; + uint32_t randomized_distance ; uint32_t data_size ; uint8_t *data_bytes; diff --git a/libretroshare/src/grouter/groutermatrix.cc b/libretroshare/src/grouter/groutermatrix.cc index dfd670225..746b3dfbb 100644 --- a/libretroshare/src/grouter/groutermatrix.cc +++ b/libretroshare/src/grouter/groutermatrix.cc @@ -32,8 +32,7 @@ GRouterMatrix::GRouterMatrix() _proba_need_updating = true ; } -bool GRouterMatrix::addRoutingClue( const GRouterKeyId& key_id,const GRouterServiceId& sid,float distance, - const std::string& desc_string,const RsPeerId& source_friend) +bool GRouterMatrix::addRoutingClue(const GRouterKeyId& key_id,const RsPeerId& source_friend,float weight) { // 1 - get the friend index. // @@ -44,7 +43,7 @@ bool GRouterMatrix::addRoutingClue( const GRouterKeyId& key_id,const GRouterServ time_t now = time(NULL) ; RoutingMatrixHitEntry rc ; - rc.weight = 1.0f / (1.0f + distance) ; + rc.weight = weight ; rc.time_stamp = now ; rc.friend_id = fid ; diff --git a/libretroshare/src/grouter/groutermatrix.h b/libretroshare/src/grouter/groutermatrix.h index f7a39296d..549adef37 100644 --- a/libretroshare/src/grouter/groutermatrix.h +++ b/libretroshare/src/grouter/groutermatrix.h @@ -60,7 +60,7 @@ class GRouterMatrix // Record one routing clue. The events can possibly be merged in time buckets. // - bool addRoutingClue(const GRouterKeyId& id,const GRouterServiceId& sid,float distance,const std::string& desc_string,const RsPeerId& source_friend) ; + bool addRoutingClue(const GRouterKeyId& id,const RsPeerId& source_friend,float weight) ; // Dump info in terminal. // diff --git a/libretroshare/src/grouter/groutertypes.h b/libretroshare/src/grouter/groutertypes.h index 96e5ee125..479b62889 100644 --- a/libretroshare/src/grouter/groutertypes.h +++ b/libretroshare/src/grouter/groutertypes.h @@ -50,6 +50,8 @@ static const time_t RS_GROUTER_PUBLISH_CAMPAIGN_PERIOD = 1 *60 ; // Check static const time_t RS_GROUTER_PUBLISH_KEY_TIME_INTERVAL = 2 *60 ; // Advertise each key once a day at most. static const time_t RS_GROUTER_ROUTING_WAITING_TIME = 3600 ; // time between two trial of sending a given message static const time_t RS_GROUTER_KEY_DIFFUSION_MAX_KEEP = 7200 ; // time to keep key diffusion items in cache, to avoid multiple diffusion. +static const uint32_t GROUTER_ITEM_DISTANCE_UNIT = 100 ; // One unit of distance between two peers +static const uint32_t GROUTER_ITEM_MAX_TRAVEL_DISTANCE = 10*100 ; // 10 distance units. That is a lot. static const uint32_t RS_GROUTER_ROUTING_STATE_UNKN = 0x0000 ; // unknown. Unused. static const uint32_t RS_GROUTER_ROUTING_STATE_PEND = 0x0001 ; // item is pending. Should be sent asap. @@ -70,6 +72,8 @@ struct FriendTrialRecord { RsPeerId friend_id ; // id of the friend time_t time_stamp ; // time of the last tried + float probability ; // probability at which the item was selected + int nb_friends ; // number of friends at the time of sending the item }; class GRouterRoutingInfo @@ -81,5 +85,6 @@ class GRouterRoutingInfo std::list tried_friends ; // list of friends to which the item was sent ordered with time. RsPeerId origin ; // which friend sent us that item time_t received_time ; // time at which the item was received + uint32_t randomized_distance ; // distance of travel, randomized at each hop. }; diff --git a/libretroshare/src/grouter/p3grouter.cc b/libretroshare/src/grouter/p3grouter.cc index a39f29c7c..d21b9cf4f 100644 --- a/libretroshare/src/grouter/p3grouter.cc +++ b/libretroshare/src/grouter/p3grouter.cc @@ -68,6 +68,17 @@ // * keep the local routing info in a cache that is saved (Which peer issued the msg) // - which probability was used to chose this friend (will be useful // to compute the routing contribution if the msg is ACK-ed) +// +// Two probabilities are computed: +// - routing probabilities among connected friends +// * this is computed by the routing matrix +// - branching factor N +// * depends on the depth of the items +// * depends on the distribution of probabilities (min and max) +// +// Once computed, +// - the item is forwarded randomly to N peers drawn from the list of connected peers with the given probabilities. +// - the depth of the item is incremented randomly // // - downward: look into routing cache. If info not present, drop the item. // Forward item into stored direction. @@ -137,6 +148,7 @@ // - packet service ID (Can be messenging, channels, etc). // - packet data (void* + size_t) // - flags (such as ACK or response required, and packet direction) +// - routed directions and probabilities // * ACK packet. // - packet unique ID (the id of the corresponding data) // - flags (reason for ACK. Could be data delivered, or error, too far, etc) @@ -285,46 +297,13 @@ void p3GRouter::routePendingObjects() { RsStackMutex mtx(grMtx) ; - // Go through list of published keys - // broadcast a publishKeyItem for each of them. - // - // The routing rules are the following: - // - // Go through list of cached routing objects. For each object: - // if(Last try is old) - // put the object in pending list - // - // (This loop is costly (could handle lots of items), so it should be done less often.) - // - // Add peer to the list of tried routes with time stamp - // Keep the list of tried friends - // - // Go through list of pendign objects. For each object: - // Select one route direction - // - according to current probabilities from the routing matrix - // - according to list of previous attempts - // - // if(route found) - // forward item, update state and time stamp - // else - // if(I am not the sender) - // send back ACK(given up) // if I am the sender, I will keep trying. - // - // Item has received an ACK - // - // ACK: given up => change route - // ACK: received => Remove item from routed items - // - // The list in _pending_messages is necessarily short and most of the time empty. Once - // treated, objects are stored in _routing_cache, where they wait for an answer. - time_t now = time(NULL) ; std::cerr << "p3GRouter::routeObjects() triage phase:" << std::endl; std::cerr << "Cached Items : " << _pending_messages.size() << std::endl; std::list lst ; - mLinkMgr->getOnlineList(lst) ; + mLinkMgr->getOnlineList(lst) ; RsPeerId own_id( mLinkMgr->getOwnId() ); for(std::map::iterator it(_pending_messages.begin());it!=_pending_messages.end();) @@ -339,54 +318,28 @@ void p3GRouter::routePendingObjects() std::map probas ; // friends probabilities for online friend list. RsPeerId routed_friend ; // friend chosen for the next hop - float best_proba = 0.0f; // temp variable used to select the best proba bool should_remove = false ; // should we remove this from the map? - // retrieve probabilities for this key. + // Retrieve probabilities for this key. This call always succeeds. If no route is known, all probabilities become equal. // - if(! _routing_matrix.computeRoutingProbabilities(it->second.data_item->destination_key, lst, probas)) + _routing_matrix.computeRoutingProbabilities(it->second.data_item->destination_key, lst, probas) ; + + // Compute the branching factor. + + int N = computeBranchingFactor(probas,it->second.randomized_distance) ; + + // Now use this to select N random peers according to the given probabilities + + std::set routing_friends = computeRoutingFriends(probas,N) ; + + std::cerr << " Routing statistics: " << std::endl; + + // Actually send the item. + + for(std::set::const_iterator its(routing_friends.begin());its!=routing_friends.end();++its) { - // key does not exist in routing matrix => send back an ACK(unknown) + std::cerr << " Friend : " << (*its) << std::endl; - std::cerr << " [Cannot compute. Unknown destination key!!] " << std::endl; - - if(it->second.origin != own_id) - { - std::cerr << " removing item and notifying the sender (" << it->second.origin.toStdString() << ")" << std::endl; - - sendACK(it->second.origin,it->first,RS_GROUTER_ACK_STATE_UNKNOWN) ; - - // remove item from cache - // - should_remove = true ; - } - std::cerr << " item is ours. Keeping it until a route is known." << std::endl; - - // else, select a routing friend at random, or just wait? Wait is probably better. - } - - bool friend_found = false ; - - for(std::map::const_iterator it2(probas.begin());it2!=probas.end();++it2) - { - std::cerr << " " << it2->first.toStdString() << " : " << it2->second << std::endl; - - // select the peer with highest probability that hasn't yet been tried. - - if(it2->second > best_proba && !(it2->first == it->second.tried_friends.front().friend_id)) - { - routed_friend = it2->first ; - best_proba = it2->second ; - friend_found = true ; - } - } - - std::cerr << " Best route: " << routed_friend.toStdString() << ", with probability " << best_proba << std::endl; - - // now, send the item. - - if(friend_found) - { // make a deep copy of the item RsGRouterGenericDataItem *new_item = it->second.data_item->duplicate() ; @@ -394,26 +347,20 @@ void p3GRouter::routePendingObjects() FriendTrialRecord ftr ; ftr.time_stamp = now ; ftr.friend_id = routed_friend ; + ftr.probability = probas[*its] ; + ftr.nb_friends = probas.size() ; + it->second.tried_friends.push_front(ftr) ; it->second.status_flags = RS_GROUTER_ROUTING_STATE_SENT ; - std::cerr << " Sending..." << std::endl; + std::cerr << " Routing probability: " << ftr.probability << std::endl; + std::cerr << " Sending..." << std::endl; + // send - new_item->PeerId(routed_friend) ; + new_item->PeerId(*its) ; sendItem(new_item) ; } - else if(it->second.origin != mLinkMgr->getOwnId() || std::find(lst.begin(),lst.end(),it->second.origin) != lst.end()) - { - // There's no correct friend to send this item to. We keep it for a while. If it's too old, - // we discard it. For now, the procedure is to send back an ACK. - std::cerr << " Item has no route candidate. It's too old. " << std::endl; - std::cerr << " sending ACK(no route) to peer " << it->second.origin.toStdString() << std::endl; - - sendACK(it->second.origin,it->first,RS_GROUTER_ACK_STATE_NO_ROUTE) ; - - should_remove = true ; - } if(should_remove) { @@ -431,7 +378,32 @@ void p3GRouter::routePendingObjects() ++it ; } else + { std::cerr << "Skipping " << std::hex << it->first << std::dec << ", dest=" << it->second.data_item->destination_key.toStdString() << ", state = " << it->second.status_flags << ", stamp=" << it->second.tried_friends.front().time_stamp << " - " << it->second.tried_friends.front().friend_id.toStdString() << std::endl; + ++it ; + } +} + +uint32_t p3GRouter::computeBranchingFactor(const std::map& probas,uint32_t dist) +{ + // This should be made a bit more adaptive ;-) + // + return 1 ; +} + +std::set p3GRouter::computeRoutingFriends(const std::map& probas,uint32_t N) +{ + // We draw N friends according to the routing probabilitites that are passed as parameter. + // + + std::set res ; + + if(probas.empty()) + return res ; + + res.insert(probas.begin()->first) ; + + return res ; } void p3GRouter::publishKeys() @@ -575,7 +547,7 @@ void p3GRouter::handleRecvPublishKeyItem(RsGRouterPublishKeyItem *item) // update the route matrix - _routing_matrix.addRoutingClue(item->published_key,item->service_id,item->randomized_distance,item->description_string,RsPeerId(item->PeerId())) ; + _routing_matrix.addRoutingClue(item->published_key,RsPeerId(item->PeerId()),1) ; // forward the key to other peers according to key forwarding cache @@ -602,7 +574,7 @@ void p3GRouter::handleRecvPublishKeyItem(RsGRouterPublishKeyItem *item) void p3GRouter::handleRecvACKItem(RsGRouterACKItem *item) { RsStackMutex mtx(grMtx) ; - std::cerr << "Received ACK item, mid=" << std::hex << item->mid << std::dec << std::endl; + std::cerr << "Received ACK item, mid=" << std::hex << item->mid << std::dec << ", ACK type = "<< item->state << std::endl; // find the item in the pendign list, // - if not found, drop. @@ -624,44 +596,79 @@ void p3GRouter::handleRecvACKItem(RsGRouterACKItem *item) switch(item->state) { case RS_GROUTER_ACK_STATE_RECEIVED_INDIRECTLY: - // Do nothing. It was indirectly received: fine. We don't need to notify the origin - // otherwise lots of ACKs will flow to the origin. - break ; case RS_GROUTER_ACK_STATE_RECEIVED: // Notify the origin. This is the main route and it was successful. + std::cerr << " updating routing matrix." << std::endl; + + it->second.status_flags = RS_GROUTER_ROUTING_STATE_ARVD ; + + { + #warning UNFINISHED code. + + // Now compute the weight for that particular item. See with what probabilities it was chosen. + // + float weight = (item->state == RS_GROUTER_ACK_STATE_RECEIVED)?1.0f : 0.5; + std::cerr << " weight = " << weight << std::endl; + + + _routing_matrix.addRoutingClue(it->second.data_item->destination_key,item->PeerId(),weight) ; + } + std::cerr << " forwarding ACK to origin: " << it->second.origin.toStdString() << std::endl; - sendACK(it->second.origin,item->mid,RS_GROUTER_ACK_STATE_RECEIVED) ; + sendACK(it->second.origin,item->mid,item->state) ; break ; case RS_GROUTER_ACK_STATE_GIVEN_UP: // route is bad. We forward back and update the routing matrix. break ; } - if(it->second.origin == mLinkMgr->getOwnId()) + if(it->second.origin == mLinkMgr->getOwnId()) { // find the client service and notify it. std::cerr << " We're owner: should notify client id" << std::endl; } - // Always remove the item. + // Just decrement the list of tried friends // - delete it->second.data_item ; - _pending_messages.erase(it) ; + for(std::list::iterator it2(it->second.tried_friends.begin());it2!=it->second.tried_friends.end();++it2) + if( (*it2).friend_id == item->PeerId()) + { + std::cerr << " Removing friend try for peer " << item->PeerId() << ". " << it->second.tried_friends.size() << " tries left." << std::endl; + it->second.tried_friends.erase(it2) ; + break ; + } + + if(it->second.tried_friends.empty()) + { + delete it->second.data_item ; + _pending_messages.erase(it) ; + + std::cerr << " No tries left. Removing item from pending list." << std::endl; + } } void p3GRouter::handleRecvDataItem(RsGRouterGenericDataItem *item) { RsStackMutex mtx(grMtx) ; - std::cerr << "Received data item for key " << item->destination_key << std::endl; + std::cerr << "Received data item for key " << item->destination_key << ", distance = " << item->randomized_distance << std::endl; + + // check the item depth. If too large, send a ACK back. + + if(item->randomized_distance > GROUTER_ITEM_MAX_TRAVEL_DISTANCE) + { + std::cerr << " Distance is too large: " << item->randomized_distance << " units. Item is dropped." << std::endl; + sendACK(item->PeerId(),item->routing_id,RS_GROUTER_ACK_STATE_TOO_FAR) ; + return ; + } // Do we have this item in the cache already? // - if not, add in the pending items // - if yet. Ignore, or send ACK for shorter route. std::map::const_iterator it = _owned_key_ids.find(item->destination_key) ; - std::map::const_iterator itr = _pending_messages.find(item->routing_id) ; + std::map::iterator itr = _pending_messages.find(item->routing_id) ; RsGRouterGenericDataItem *item_copy = NULL; if(itr != _pending_messages.end()) @@ -670,7 +677,7 @@ void p3GRouter::handleRecvDataItem(RsGRouterGenericDataItem *item) item_copy = itr->second.data_item ; } - else // item is now known. Store it into pending msgs. We make a copy, since the item will be deleted otherwise. + else // item is not known. Store it into pending msgs. We make a copy, since the item will be deleted otherwise. { std::cerr << " Item is new. Storing in cache as pending messages." << std::endl; @@ -679,38 +686,46 @@ void p3GRouter::handleRecvDataItem(RsGRouterGenericDataItem *item) info.data_item = item->duplicate() ; item_copy = info.data_item ; - if(it != _owned_key_ids.end()) - info.status_flags = RS_GROUTER_ROUTING_STATE_ARVD ; - else - info.status_flags = RS_GROUTER_ROUTING_STATE_PEND ; - info.origin = RsPeerId(item->PeerId()) ; info.received_time = time(NULL) ; _pending_messages[item->routing_id] = info ; + itr = _pending_messages.find(item->routing_id) ; } // Is the item for us? If so, find the client service and send the item back. - + // if(it != _owned_key_ids.end()) if(time(NULL) < it->second.validity_time) { - // test validity time. If too old, we don't forward. - - std::map::const_iterator its = _registered_services.find(it->second.service_id) ; - - if(its != _registered_services.end()) - { - std::cerr << " Key is owned by us. Notifying service for this item." << std::endl; - its->second->receiveGRouterData(it->first,item_copy) ; - } + if(itr->second.status_flags == RS_GROUTER_ROUTING_STATE_ARVD) + sendACK(item->PeerId(), item->routing_id, RS_GROUTER_ACK_STATE_RECEIVED_INDIRECTLY) ; else - std::cerr << " (EE) weird situation. No service registered for a key that we own. Key id = " << item->destination_key.toStdString() << ", service id = " << it->second.service_id << std::endl; + { + sendACK(item->PeerId(), item->routing_id, RS_GROUTER_ACK_STATE_RECEIVED) ; + itr->second.status_flags = RS_GROUTER_ROUTING_STATE_ARVD ; + + std::map::const_iterator its = _registered_services.find(it->second.service_id) ; + + if(its != _registered_services.end()) + { + std::cerr << " Key is owned by us. Notifying service for this item." << std::endl; + its->second->receiveGRouterData(it->first,item_copy) ; + } + else + std::cerr << " (EE) weird situation. No service registered for a key that we own. Key id = " << item->destination_key.toStdString() << ", service id = " << it->second.service_id << std::endl; + } } else - std::cerr << " (WW) key is outdated. Dropping this item." << std::endl; + { + std::cerr << " key is outdated. The item will be dropped." << std::endl; + return ; + } else - std::cerr << " Item is not for us. Leaving in pending msgs to be routed later." << std::endl; + { + std::cerr << " item is not for us. Storing in pending mode." << std::endl; + itr->second.status_flags = RS_GROUTER_ROUTING_STATE_PEND ; + } _changed = true ; } @@ -739,9 +754,13 @@ void p3GRouter::sendData(const GRouterKeyId& destination, RsGRouterGenericDataIt GRouterMsgPropagationId propagation_id ; do { propagation_id = RSRandom::random_u32(); } while(_pending_messages.find(propagation_id) != _pending_messages.end()) ; + item->destination_key = destination ; + item->routing_id = propagation_id ; + std::cerr << "p3GRouter::sendGRouterData(): pushing the followign item in the msg pending list:" << std::endl; std::cerr << " data_item.size = " << info.data_item->data_size << std::endl; std::cerr << " data_item.byte = " << info.data_item->data_bytes << std::endl; + std::cerr << " destination = " << info.data_item->destination_key << std::endl; std::cerr << " status = " << info.status_flags << std::endl; std::cerr << " origin = " << info.origin.toStdString() << std::endl; std::cerr << " Recv time = " << info.received_time << std::endl; @@ -854,7 +873,17 @@ void p3GRouter::debugDump() std::cerr << " [Not shown yet] " << std::endl; std::cerr << " Data items: " << std::endl; - std::cerr << " [Not shown yet] " << std::endl; + + static std::string statusString[4] = { "Unkn","Pend","Sent","Ackn" }; + + for(std::map::iterator it(_pending_messages.begin());it!=_pending_messages.end();++it) + std::cerr << " Msg id: " << std::hex << it->first << std::dec + << " Origin: " << it->second.origin.toStdString() + << " Time : " << now - it->second.tried_friends.front().time_stamp << " secs ago." + << " Status: " << statusString[it->second.status_flags] << std::endl; + +// << " Last : " << it->second.tried_friends.front().friend_id.toStdString() << std::endl; +// << " Probabilities: " << std::endl; std::cerr << " Routing matrix: " << std::endl; diff --git a/libretroshare/src/grouter/p3grouter.h b/libretroshare/src/grouter/p3grouter.h index 998a5d2b1..1bf5e2d17 100644 --- a/libretroshare/src/grouter/p3grouter.h +++ b/libretroshare/src/grouter/p3grouter.h @@ -133,6 +133,9 @@ class p3GRouter: public RsGRouter, public p3Service, public p3Config void debugDump() ; void locked_forwardKey(const RsGRouterPublishKeyItem&) ; + static uint32_t computeBranchingFactor(const std::map& probas,uint32_t dist) ; + static std::set computeRoutingFriends(const std::map& probas,uint32_t N) ; + //===================================================// // p3Config methods // //===================================================//