added grouter as a msg sending service. Not enable yet. Added serialisation for grouter items and test methods in tests/serialiser/. Added saveList/loadList for grouter. set connectToTurtleRouter() and connectToGRouter() to be mandatory methods

git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@6970 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
csoler 2013-12-27 20:06:47 +00:00
parent da1b6ac845
commit 5c52890ad5
17 changed files with 784 additions and 80 deletions

View file

@ -32,8 +32,6 @@
#include "groutertypes.h"
#include "grouterclientservice.h"
static const uint32_t RS_GROUTER_ROUTING_WAITING_TIME = 3600 ; // time between two trial of sending a message
p3GRouter::p3GRouter(p3LinkMgr *lm)
: p3Service(RS_SERVICE_TYPE_GROUTER), p3Config(CONFIG_TYPE_GROUTER), mLinkMgr(lm), grMtx("GRouter")
{
@ -75,6 +73,7 @@ int p3GRouter::tick()
if(now > last_publish_campaign_time + RS_GROUTER_PUBLISH_CAMPAIGN_PERIOD)
{
last_publish_campaign_time = now ;
publishKeys() ;
_routing_matrix.updateRoutingProbabilities() ;
}
@ -103,13 +102,34 @@ void p3GRouter::autoWash()
{
RsStackMutex mtx(grMtx) ;
std::cerr << "p3GRouter::autoWash() Unimplemented !!" << std::endl;
std::cerr << "p3GRouter::autoWash(): cleaning old entried." << std::endl;
// cleanup cache
time_t now = time(NULL) ;
for(std::map<GRouterKeyPropagationId,time_t>::iterator it(_key_diffusion_time_stamps.begin());it!=_key_diffusion_time_stamps.end();)
if(it->second + RS_GROUTER_KEY_DIFFUSION_MAX_KEEP < now)
{
std::cerr << " Removing key diffusion time stamp " << it->second << " for diffusion id " << std::hex << it->first << std::dec << std::endl;
std::map<GRouterKeyPropagationId,time_t>::iterator tmp(it) ;
++tmp ;
_key_diffusion_time_stamps.erase(it) ;
it = tmp ;
}
else
++it ;
// look into pending items.
std::cerr << " Pending key diffusion items: " << _key_diffusion_items.size() << std::endl;
std::cerr << " Pending messages to route : " << _pending_messages.size() << std::endl;
}
void p3GRouter::routePendingObjects()
{
RsStackMutex mtx(grMtx) ;
// Go through list of published keys
// broadcast a publishKeyItem for each of them.
//
@ -157,7 +177,7 @@ void p3GRouter::routePendingObjects()
lst.push_back(SSLIdType(*it)) ;
for(std::map<GRouterMsgPropagationId, GRouterRoutingInfo>::iterator it(_pending_messages.begin());it!=_pending_messages.end();)
if((it->second.status_flags & RS_GROUTER_ROUTING_STATE_PEND) || it->second.status_flags == RS_GROUTER_ROUTING_STATE_SENT && it->second.tried_friends.front().time_stamp+RS_GROUTER_ROUTING_WAITING_TIME < now)
if((it->second.status_flags & RS_GROUTER_ROUTING_STATE_PEND) || (it->second.status_flags == RS_GROUTER_ROUTING_STATE_SENT && it->second.tried_friends.front().time_stamp+RS_GROUTER_ROUTING_WAITING_TIME < now))
{
std::cerr << " Msg id: " << std::hex << it->first << std::dec << std::endl;
std::cerr << " Origin: " << it->second.origin.toStdString() << std::endl;
@ -265,12 +285,12 @@ void p3GRouter::routePendingObjects()
void p3GRouter::publishKeys()
{
RsStackMutex mtx(grMtx) ;
// Go through list of published keys
// broadcast a publishKeyItem for each of them.
time_t now = time(NULL) ;
std::list<std::string> connected_peers ;
mLinkMgr->getOnlineList(connected_peers) ;
for(std::map<GRouterKeyId, GRouterPublishedKeyInfo>::iterator it(_owned_key_ids.begin());it!=_owned_key_ids.end();++it)
{
@ -291,25 +311,41 @@ void p3GRouter::publishKeys()
item.service_id = info.service_id ;
item.randomized_distance = drand48() ;
item.description_string = info.description_string ;
item.PeerId("") ; // no peer id => key is forwarded to all friends.
// get list of connected friends, and broadcast to all of them
//
for(std::list<std::string>::const_iterator it(connected_peers.begin());it!=connected_peers.end();++it)
{
std::cerr << " sending to " << (*it) << std::endl;
locked_forwardKey(item) ;
RsGRouterPublishKeyItem *itm = new RsGRouterPublishKeyItem(item) ;
itm->PeerId(*it) ;
// we should randomise the depth
sendItem(itm) ;
}
info.last_published_time = now ;
}
}
}
void p3GRouter::locked_forwardKey(const RsGRouterPublishKeyItem& item)
{
std::list<std::string> connected_peers ;
mLinkMgr->getOnlineList(connected_peers) ;
std::cerr << " Forwarding key item to all available friends..." << std::endl;
// get list of connected friends, and broadcast to all of them
//
for(std::list<std::string>::const_iterator it(connected_peers.begin());it!=connected_peers.end();++it)
if(item.PeerId() != *it)
{
std::cerr << " sending to " << (*it) << std::endl;
RsGRouterPublishKeyItem *itm = new RsGRouterPublishKeyItem(item) ;
itm->PeerId(*it) ;
// we should randomise the depth
sendItem(itm) ;
}
else
std::cerr << " Not forwarding to source id " << item.PeerId() << std::endl;
}
bool p3GRouter::registerKey(const GRouterKeyId& key,const GRouterServiceId& client_id,const std::string& description)
{
RsStackMutex mtx(grMtx) ;
@ -355,6 +391,8 @@ void p3GRouter::handleIncoming()
void p3GRouter::handleRecvPublishKeyItem(RsGRouterPublishKeyItem *item)
{
RsStackMutex mtx(grMtx) ;
std::cerr << "Received key publish item for key :" << std::endl ;
std::cerr << " diffusion = " << std::hex << item->diffusion_id << std::dec << std::endl ;
std::cerr << " key id = " << item->published_key.toStdString() << std::endl ;
@ -369,25 +407,27 @@ void p3GRouter::handleRecvPublishKeyItem(RsGRouterPublishKeyItem *item)
// forward the key to other peers according to key forwarding cache
std::map<GRouterKeyPropagationId,time_t>::iterator it = _key_diffusion_time_stamps.find(item->diffusion_id) ;
bool found = false ;
if(it != _key_diffusion_time_stamps.end()) // found. We don't propagate further
found = true ;
bool found = (it != _key_diffusion_time_stamps.end()); // found. We don't propagate further
_key_diffusion_time_stamps[item->diffusion_id] = time(NULL) ; // always stamp
if(found)
{
std::cerr << " Key diffusion item already in cache. Not forwardign further." << std::endl;
return ;
}
// Propagate the item to all other online friends. We don't do this right now, but push items in a queue.
// Doing this we can control the amount of key propagation and avoid flooding.
_key_diffusion_items.push(item) ;
locked_forwardKey(*item) ;
}
void p3GRouter::handleRecvACKItem(RsGRouterACKItem *item)
{
std::cerr << "Received ACK item, mid=" << (void*)item->mid << std::endl;
RsStackMutex mtx(grMtx) ;
std::cerr << "Received ACK item, mid=" << std::hex << item->mid << std::dec << std::endl;
// find the item in the pendign list,
// - if not found, drop.
@ -438,6 +478,7 @@ void p3GRouter::handleRecvACKItem(RsGRouterACKItem *item)
void p3GRouter::handleRecvDataItem(RsGRouterGenericDataItem *item)
{
RsStackMutex mtx(grMtx) ;
std::cerr << "Received data item for key " << item->destination_key.toStdString() << std::endl;
// Do we have this item in the cache already?
@ -497,8 +538,16 @@ void p3GRouter::handleRecvDataItem(RsGRouterGenericDataItem *item)
std::cerr << " Item is not for us. Leaving in pending msgs to be routed later." << std::endl;
}
bool p3GRouter::registerClientService(const GRouterServiceId& id,GRouterClientService *service)
{
RsStackMutex mtx(grMtx) ;
_registered_services[id] = service ;
return true ;
}
void p3GRouter::sendData(const GRouterKeyId& destination, RsGRouterGenericDataItem *item)
{
RsStackMutex mtx(grMtx) ;
// push the item into pending messages.
//
GRouterRoutingInfo info ;
@ -511,7 +560,7 @@ void p3GRouter::sendData(const GRouterKeyId& destination, RsGRouterGenericDataIt
// Make sure we have a unique id (at least locally).
//
GRouterMsgPropagationId propagation_id ;
do propagation_id = RSRandom::random_u32() ; while(_pending_messages.find(propagation_id) != _pending_messages.end()) ;
do { propagation_id = RSRandom::random_u32(); } while(_pending_messages.find(propagation_id) != _pending_messages.end()) ;
std::cerr << "p3GRouter::sendGRouterData(): pushing the followign item in the msg pending list:" << std::endl;
std::cerr << " data_item.size = " << info.data_item->data_size << std::endl;
@ -536,24 +585,71 @@ void p3GRouter::sendACK(const SSLIdType& peer, GRouterMsgPropagationId mid, uint
bool p3GRouter::loadList(std::list<RsItem*>& items)
{
std::cerr << "(WW) " << __PRETTY_FUNCTION__ << ": not implemented." << std::endl;
return false ;
}
bool p3GRouter::saveList(bool&,std::list<RsItem*>& items)
{
std::cerr << "(WW) " << __PRETTY_FUNCTION__ << ": not implemented." << std::endl;
RsStackMutex mtx(grMtx) ;
std::cerr << "p3GRouter::loadList() : " << std::endl;
_routing_matrix.loadList(items) ;
// remove all existing objects.
//
std::cerr << " removing all existing items (" << _pending_messages.size() << " items to delete)." << std::endl;
for(std::map<GRouterMsgPropagationId,GRouterRoutingInfo>::iterator it(_pending_messages.begin());it!=_pending_messages.end();++it)
delete it->second.data_item ;
_pending_messages.clear() ;
for(std::list<RsItem*>::const_iterator it(items.begin());it!=items.end();++it)
{
RsGRouterRoutingInfoItem *itm1 = NULL ;
if(NULL != (itm1 = dynamic_cast<RsGRouterRoutingInfoItem*>(*it)))
{
_pending_messages[itm1->data_item->routing_id] = *itm1 ;
_pending_messages[itm1->data_item->routing_id].data_item = itm1->data_item ; // avoids duplication.
itm1->data_item = NULL ; // prevents deletion.
}
delete *it ;
}
return true ;
}
bool p3GRouter::saveList(bool& cleanup,std::list<RsItem*>& items)
{
// We save
// - the routing clues
// - the pending items
return false ;
cleanup = true ; // the client should delete the items.
std::cerr << "p3GRouter::saveList()..." << std::endl;
std::cerr << " saving routing clues." << std::endl;
_routing_matrix.saveList(items) ;
std::cerr << " saving pending items." << std::endl;
for(std::map<GRouterMsgPropagationId,GRouterRoutingInfo>::const_iterator it(_pending_messages.begin());it!=_pending_messages.end();++it)
{
RsGRouterRoutingInfoItem *item = new RsGRouterRoutingInfoItem ;
*(GRouterRoutingInfo*)item = it->second ; // copy all members
item->data_item = it->second.data_item->duplicate() ; // deep copy, because we call delete on the object, and the item might be removed before we handle it in the client.
items.push_back(item) ;
}
return true ;
}
// Dump everything
//
void p3GRouter::debugDump()
{
RsStackMutex mtx(grMtx) ;
time_t now = time(NULL) ;
std::cerr << "Full dump of Global Router state: " << std::endl;