progress in grouter code

git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@6955 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
csoler 2013-12-22 17:04:13 +00:00
parent dbfe3f6c51
commit fd994a546b
6 changed files with 270 additions and 21 deletions

View file

@ -31,6 +31,8 @@
#include "grouteritems.h"
#include "groutertypes.h"
static const uint32_t RS_GROUTER_ROUTING_WAITING_TIME = 3600 ; // time between two trial of sending a message
p3GRouter::p3GRouter(p3LinkMgr *lm)
: p3Service(RS_SERVICE_TYPE_GROUTER), p3Config(CONFIG_TYPE_GROUTER), mLinkMgr(lm), grMtx("GRouter")
{
@ -58,7 +60,7 @@ int p3GRouter::tick()
{
// route pending objects
//
routeObjects() ;
routePendingObjects() ;
last_autowash_time = now ;
autoWash() ;
@ -105,13 +107,161 @@ void p3GRouter::autoWash()
// cleanup cache
}
void p3GRouter::routeObjects()
void p3GRouter::routePendingObjects()
{
// Go through list of published keys
// broadcast a publishKeyItem for each of them.
//
// The routing rules are the following:
//
// Go through list of cached routing objects. For each object:
// if(Last try is old)
// put the object in pending list
//
// (This loop is costly (could handle lots of items), so it should be done less often.)
//
// Add peer to the list of tried routes with time stamp
// Keep the list of tried friends
//
// Go through list of pendign objects. For each object:
// Select one route direction
// - according to current probabilities from the routing matrix
// - according to list of previous attempts
//
// if(route found)
// forward item, update state and time stamp
// else
// if(I am not the sender)
// send back ACK(given up) // if I am the sender, I will keep trying.
//
// Item has received an ACK
//
// ACK: given up => change route
// ACK: received => Remove item from routed items
//
// The list in _pending_messages is necessarily short and most of the time empty. Once
// treated, objects are stored in _routing_cache, where they wait for an answer.
std::cerr << "p3GRouter::routeObjects() Unimplemented !!" << std::endl;
time_t now = time(NULL) ;
std::cerr << "p3GRouter::routeObjects() triage phase:" << std::endl;
std::cerr << "Cached Items : " << _pending_messages.size() << std::endl;
std::list<std::string> lst_tmp ;
std::list<SSLIdType> lst ;
mLinkMgr->getOnlineList(lst_tmp) ;
SSLIdType own_id( mLinkMgr->getOwnId() );
for(std::list<std::string>::const_iterator it(lst_tmp.begin());it!=lst_tmp.end();++it)
lst.push_back(SSLIdType(*it)) ;
for(std::map<GRouterMsgPropagationId, GRouterRoutingInfo>::iterator it(_pending_messages.begin());it!=_pending_messages.end();)
if((it->second.status_flags & RS_GROUTER_ROUTING_STATE_PEND) || it->second.status_flags == RS_GROUTER_ROUTING_STATE_SENT && it->second.tried_friends.front().time_stamp+RS_GROUTER_ROUTING_WAITING_TIME < now)
{
std::cerr << " Msg id: " << std::hex << it->first << std::dec << std::endl;
std::cerr << " Origin: " << it->second.origin.toStdString() << std::endl;
std::cerr << " Last : " << it->second.tried_friends.front().friend_id.toStdString() << std::endl;
std::cerr << " Time : " << it->second.tried_friends.front().time_stamp << std::endl;
std::cerr << " Flags : " << it->second.status_flags << std::endl;
std::cerr << " Probabilities: " << std::endl;
std::map<SSLIdType,float> probas ; // friends probabilities for online friend list.
SSLIdType routed_friend ; // friend chosen for the next hop
float best_proba = 0.0f; // temp variable used to select the best proba
bool should_remove = false ; // should we remove this from the map?
// retrieve probabilities for this key.
//
if(! _routing_matrix.computeRoutingProbabilities(it->second.data_item->destination_key, lst, probas))
{
// key does not exist in routing matrix => send back an ACK(unknown)
std::cerr << " [Cannot compute. Unknown destination key!!] " << std::endl;
if(it->second.origin != own_id)
{
std::cerr << " removing item and notifying the sender (" << it->second.origin.toStdString() << ")" << std::endl;
sendACK(it->second.origin,it->first,RS_GROUTER_ACK_STATE_UNKNOWN) ;
// remove item from cache
//
should_remove = true ;
}
std::cerr << " item is ours. Keeping it until a route is known." << std::endl;
// else, select a routing friend at random, or just wait? Wait is probably better.
}
bool friend_found = false ;
for(std::map<SSLIdType,float>::const_iterator it2(probas.begin());it2!=probas.end();++it2)
{
std::cerr << " " << it2->first.toStdString() << " : " << it2->second << std::endl;
// select the peer with highest probability that hasn't yet been tried.
if(it2->second > best_proba && !(it2->first == it->second.tried_friends.front().friend_id))
{
routed_friend = it2->first ;
best_proba = it2->second ;
friend_found = true ;
}
}
std::cerr << " Best route: " << routed_friend.toStdString() << ", with probability " << best_proba << std::endl;
// now, send the item.
if(friend_found)
{
// make a deep copy of the item
RsGRouterGenericDataItem *new_item = it->second.data_item->duplicate() ;
// update cache entry
FriendTrialRecord ftr ;
ftr.time_stamp = now ;
ftr.friend_id = routed_friend ;
it->second.tried_friends.push_front(ftr) ;
it->second.status_flags |= RS_GROUTER_ROUTING_STATE_SENT ;
std::cerr << " Sending..." << std::endl;
// send
new_item->PeerId(routed_friend.toStdString()) ;
sendItem(new_item) ;
}
else if(!(it->second.status_flags & RS_GROUTER_ROUTING_STATE_OWND) || std::find(lst.begin(),lst.end(),it->second.origin) != lst.end())
{
// There's no correct friend to send this item to. We keep it for a while. If it's too old,
// we discard it. For now, the procedure is to send back an ACK.
std::cerr << " Item has no route candidate. It's too old. " << std::endl;
std::cerr << " sending ACK(no route) to peer " << it->second.origin.toStdString() << std::endl;
sendACK(it->second.origin,it->first,RS_GROUTER_ACK_STATE_NO_ROUTE) ;
should_remove = true ;
}
if(should_remove)
{
// We remove from the map. That means the RsItem* has been transfered to somewhere else.
//
std::cerr << " Removing item from pending items" << std::endl;
std::map<GRouterMsgPropagationId, GRouterRoutingInfo>::iterator tmp(it) ;
delete it->second.data_item ;
++tmp ;
_pending_messages.erase(it) ;
it = tmp ;
}
else
++it ;
}
else
std::cerr << "Skipping " << std::hex << it->first << std::dec << ", dest=" << it->second.data_item->destination_key.toStdString() << ", state = " << it->second.status_flags << ", stamp=" << it->second.tried_friends.front().time_stamp << " - " << it->second.tried_friends.front().friend_id.toStdString() << std::endl;
}
void p3GRouter::publishKeys()
{
// Go through list of published keys
@ -187,14 +337,14 @@ void p3GRouter::handleIncoming()
{
switch(item->PacketSubType())
{
case RS_PKT_SUBTYPE_GROUTER_PUBLISH_KEY: handleRecvPublishKeyItem(dynamic_cast<RsGRouterPublishKeyItem*>(item)) ;
break ;
case RS_PKT_SUBTYPE_GROUTER_PUBLISH_KEY: handleRecvPublishKeyItem(dynamic_cast<RsGRouterPublishKeyItem*>(item)) ;
break ;
case RS_PKT_SUBTYPE_GROUTER_DATA: handleRecvDataItem(dynamic_cast<RsGRouterGenericDataItem*>(item)) ;
break ;
case RS_PKT_SUBTYPE_GROUTER_DATA: handleRecvDataItem(dynamic_cast<RsGRouterGenericDataItem*>(item)) ;
break ;
case RS_PKT_SUBTYPE_GROUTER_ACK: handleRecvACKItem(dynamic_cast<RsGRouterACKItem*>(item)) ;
break ;
case RS_PKT_SUBTYPE_GROUTER_ACK: handleRecvACKItem(dynamic_cast<RsGRouterACKItem*>(item)) ;
break ;
default:
std::cerr << "(EE) " << __PRETTY_FUNCTION__ << ": Unhandled item type " << item->PacketSubType() << std::endl;
}
@ -236,18 +386,53 @@ void p3GRouter::handleRecvPublishKeyItem(RsGRouterPublishKeyItem *item)
void p3GRouter::handleRecvACKItem(RsGRouterACKItem *item)
{
std::cerr << "Received data item item for key " << item->destination_key.toStdString() << std::endl;
std::cerr << "Received ACK item, mid=" << (void*)item->mid << std::endl;
// find the item in the pendign list,
// - if not found, drop.
//
// ...and act appropriately:
// - item was
// - if we're origin
// notify the client service
// else
// remove item
//
std::map<GRouterMsgPropagationId,GRouterRoutingInfo>::iterator it(_pending_messages.find(item->mid)) ;
if(it == _pending_messages.end())
{
std::cerr << "p3GRouter::handleRecvACKItem(): cannot find entry for message id " << std::hex << item->mid << std::dec << ". Dropping it." << std::endl;
return ;
}
switch(item->state)
{
case RS_GROUTER_ACK_STATE_RECEIVED_INDIRECTLY:
case RS_GROUTER_ACK_STATE_RECEIVED: // received: - do we update the routing matrix?
// - and forward back
// Do nothing. It was indirectly received: fine. We don't need to notify the origin
// otherwise lots of ACKs will flow to the origin.
break ;
case RS_GROUTER_ACK_STATE_RECEIVED:
// Notify the origin. This is the main route and it was successful.
std::cerr << " forwarding ACK to origin: " << it->second.origin.toStdString() << std::endl;
sendACK(it->second.origin,item->mid,RS_GROUTER_ACK_STATE_RECEIVED) ;
break ;
case RS_GROUTER_ACK_STATE_GIVEN_UP: // route is bad. We forward back and update the routing matrix.
break ;
}
if(it->second.status_flags & RS_GROUTER_ROUTING_STATE_OWND)
{
// find the client service and notify it.
std::cerr << " We're owner: should notify client id" << std::endl;
}
// Always remove the item.
//
delete it->second.data_item ;
_pending_messages.erase(it) ;
}
void p3GRouter::handleRecvDataItem(RsGRouterGenericDataItem *item)
@ -268,6 +453,17 @@ void p3GRouter::sendData(const GRouterKeyId& destination, void *& item_data,uint
std::cerr << "(WW) " << __PRETTY_FUNCTION__ << ": not implemented." << std::endl;
}
void p3GRouter::sendACK(const SSLIdType& peer, GRouterMsgPropagationId mid, uint32_t ack_flags)
{
RsGRouterACKItem *item = new RsGRouterACKItem ;
item->state = ack_flags ;
item->mid = mid ;
item->PeerId(peer.toStdString()) ;
sendItem(item) ;
}
bool p3GRouter::loadList(std::list<RsItem*>& items)
{
std::cerr << "(WW) " << __PRETTY_FUNCTION__ << ": not implemented." << std::endl;
@ -276,6 +472,11 @@ bool p3GRouter::loadList(std::list<RsItem*>& items)
bool p3GRouter::saveList(bool&,std::list<RsItem*>& items)
{
std::cerr << "(WW) " << __PRETTY_FUNCTION__ << ": not implemented." << std::endl;
// We save
// - the routing clues
// - the pending items
return false ;
}