fixed up cache loading issues on get key. Polished grouter stats. Improved sending logic. Distant msgs now work 100% (sync-ed)

git-svn-id: http://svn.code.sf.net/p/retroshare/code/branches/v0.6-NewGRouterModel@7860 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
csoler 2015-01-23 12:13:13 +00:00
parent 1998ddd765
commit e0308eacd2
6 changed files with 182 additions and 171 deletions

View file

@ -217,6 +217,8 @@ RsGRouterRoutingInfoItem *RsGRouterSerialiser::deserialise_RsGRouterRoutingInfoI
else
ok = false ;
}
else
item->receipt_item = NULL ;
if (offset != rssize || !ok)

View file

@ -59,7 +59,7 @@
//
// Decentralized routing algorithm:
// - tick() method
// * calls autoWash(), send() and receive()
// * calls send() and receive()
//
// - message passing
// - upward:
@ -126,9 +126,6 @@
// bool computeRoutingProbabilities(RSAKeyIDType id, const std::vector<SSLIdType>& friends,
// std::vector<float>& probas) const ;
//
// // remove oldest entries.
// bool autoWash() ;
//
// // Record one routing clue. The events can possibly be merged in time buckets.
// //
// bool addRoutingEvent(RSAKeyIDType id,const SSLIdType& which friend) ;
@ -206,6 +203,7 @@ static const uint32_t MAX_TUNNEL_UNMANAGED_TIME = 600 ; // min time bef
static const uint32_t MAX_DELAY_BETWEEN_TWO_SEND = 120 ; // wait for 120 seconds before re-sending.
static const uint32_t TUNNEL_OK_WAIT_TIME = 10 ; // wait for 10 seconds after last tunnel ok, so that we have a complete set of tunnels.
static const uint32_t MAX_GROUTER_DATA_SIZE = 2*1024*1024 ; // 2MB size limit. This is of course arbitrary.
static const uint32_t MAX_RECEIPT_WAIT_TIME = 20 ; // wait for at most 20 secs for a receipt. If not, cancel.
const std::string p3GRouter::SERVICE_INFO_APP_NAME = "Global Router" ;
@ -230,14 +228,6 @@ int p3GRouter::tick()
time_t now = time(NULL) ;
routePendingObjects() ;
if(now > _last_autowash_time + RS_GROUTER_AUTOWASH_PERIOD)
{
// route pending objects
//
_last_autowash_time = now ;
autoWash() ;
}
// Go through the list of active tunnel requests and pending objects to ask for new tunnels
// or close existing tunnel requests.
//
@ -304,57 +294,6 @@ RsSerialiser *p3GRouter::setupSerialiser()
return rss ;
}
void p3GRouter::autoWash()
{
std::map<GRouterMsgPropagationId,GRouterClientService *> cancelled_msgs ;
{
RsStackMutex mtx(grMtx) ;
#ifdef GROUTER_DEBUG
grouter_debug() << "p3GRouter::autoWash(): cleaning old entried." << std::endl;
#endif
// cleanup cache
time_t now = time(NULL) ;
for(std::map<GRouterMsgPropagationId,GRouterRoutingInfo>::iterator it(_pending_messages.begin());it!=_pending_messages.end();)
if(it->second.received_time_TS + GROUTER_ITEM_MAX_CACHE_KEEP_TIME < now) // is the item too old for cache
{
#ifdef GROUTER_DEBUG
grouter_debug() << " Removing cached item " << std::hex << it->first << std::dec << std::endl;
#endif
GRouterClientService *client = NULL ;
GRouterServiceId service_id = 0;
if(!locked_getClientAndServiceId(it->second.tunnel_hash,it->second.data_item->destination_key,client,service_id))
{
std::cerr << " ERROR: cannot find client for cancelled message " << it->first << std::endl;
}
else
cancelled_msgs[it->first] = client ;
delete it->second.data_item ;
std::map<GRouterMsgPropagationId,GRouterRoutingInfo>::iterator tmp(it) ;
++tmp ;
_pending_messages.erase(it) ;
it = tmp ;
}
else
++it ;
// look into pending items.
#ifdef GROUTER_DEBUG
grouter_debug() << " Pending messages to route : " << _pending_messages.size() << std::endl;
#endif
}
for(std::map<GRouterMsgPropagationId,GRouterClientService*>::const_iterator it(cancelled_msgs.begin());it!=cancelled_msgs.end();++it)
it->second->notifyDataStatus(it->first, GROUTER_CLIENT_SERVICE_DATA_STATUS_FAILED) ;
}
bool p3GRouter::registerKey(const RsGxsId& authentication_key,const GRouterServiceId& client_id,const std::string& description)
{
RS_STACK_MUTEX(grMtx) ;
@ -653,7 +592,8 @@ void p3GRouter::addVirtualPeer(const TurtleFileHash& hash,const TurtleVirtualPee
#endif
it->second.tunnel_status = RS_GROUTER_TUNNEL_STATUS_READY ;
found = true ;
break ;
// don't break here, because we might send multiple items though the same tunnel.
}
if(!found)
@ -700,7 +640,15 @@ void p3GRouter::removeVirtualPeer(const TurtleFileHash& hash,const TurtleVirtual
#endif
_virtual_peers.erase(it) ;
}
#ifdef GROUTER_DEBUG
std::cerr << " setting tunnel status in pending message." << std::endl;
#endif
for(std::map<GRouterMsgPropagationId,GRouterRoutingInfo>::iterator it2(_pending_messages.begin());it2!=_pending_messages.end();++it2)
if(it2->second.tunnel_hash == hash && it->second.virtual_peers.empty())
it2->second.tunnel_status = RS_GROUTER_TUNNEL_STATUS_PENDING ;
}
void p3GRouter::connectToTurtleRouter(p3turtle *pt)
{
mTurtle = pt ;
@ -801,12 +749,24 @@ if(!_pending_messages.empty())
grouter_debug() << std::endl;
#endif
}
else if(it->second.data_status == RS_GROUTER_DATA_STATUS_RECEIPT_OK)
else if(it->second.data_status == RS_GROUTER_DATA_STATUS_RECEIPT_OK )
{
#ifdef GROUTER_DEBUG
std::cerr << " closing pending tunnels." << std::endl;
#endif
mTurtle->stopMonitoringTunnels(it->second.tunnel_hash) ;
it->second.tunnel_status = RS_GROUTER_TUNNEL_STATUS_UNMANAGED ;
}
else if(it->second.data_status == RS_GROUTER_DATA_STATUS_SENT && it->second.last_sent_TS + MAX_RECEIPT_WAIT_TIME < now)
{
#ifdef GROUTER_DEBUG
std::cerr << " closing pending tunnels." << std::endl;
#endif
mTurtle->stopMonitoringTunnels(it->second.tunnel_hash) ;
it->second.tunnel_status = RS_GROUTER_TUNNEL_STATUS_UNMANAGED ;
it->second.data_status = RS_GROUTER_DATA_STATUS_PENDING ;
}
#ifdef GROUTER_DEBUG
else
@ -840,16 +800,17 @@ void p3GRouter::routePendingObjects()
// Go throught he list of pending messages.
// For those with a tunnel ready, send the message in the tunnel.
RS_STACK_MUTEX(grMtx) ;
time_t now = time(NULL) ;
std::map<GRouterMsgPropagationId,std::pair<GRouterClientService *,uint32_t> > notified_msgs ;
{
RS_STACK_MUTEX(grMtx) ;
#ifdef GROUTER_DEBUG
if(!_pending_messages.empty())
std::cerr << "p3GRouter::routePendingObjects()" << std::endl;
#endif
for(std::map<GRouterMsgPropagationId, GRouterRoutingInfo>::iterator it=_pending_messages.begin();it!=_pending_messages.end();++it)
for(std::map<GRouterMsgPropagationId, GRouterRoutingInfo>::iterator it=_pending_messages.begin();it!=_pending_messages.end();)
if(it->second.data_status == RS_GROUTER_DATA_STATUS_PENDING && it->second.tunnel_status == RS_GROUTER_TUNNEL_STATUS_READY && now > it->second.last_sent_TS + MAX_DELAY_BETWEEN_TWO_SEND)
{
#ifdef GROUTER_DEBUG
@ -864,6 +825,7 @@ void p3GRouter::routePendingObjects()
#ifdef GROUTER_DEBUG
std::cerr << ". No virtual peers. Skipping now." << std::endl;
#endif
++it ;
continue ;
}
@ -872,6 +834,7 @@ void p3GRouter::routePendingObjects()
#ifdef GROUTER_DEBUG
std::cerr << ". Still waiting delay (stabilisation)." << std::endl;
#endif
++it ;
continue ;
}
@ -886,6 +849,7 @@ void p3GRouter::routePendingObjects()
#ifdef GROUTER_DEBUG
std::cerr << " no peers available. Cannot send!!" << std::endl;
#endif
++it ;
continue ;
}
TurtleVirtualPeerId vpid = (vpit->second.virtual_peers.begin())->first ;
@ -901,10 +865,48 @@ void p3GRouter::routePendingObjects()
#endif
it->second.last_sent_TS = now ;
}
// Also route back some ACKs if necessary.
// [..]
++it ;
}
else if(it->second.data_status == RS_GROUTER_DATA_STATUS_RECEIPT_OK || it->second.received_time_TS + GROUTER_ITEM_MAX_CACHE_KEEP_TIME < now) // is the item too old for cache
{
#ifdef GROUTER_DEBUG
if(it->second.data_status == RS_GROUTER_DATA_STATUS_RECEIPT_OK)
grouter_debug() << " Removing received cached item " << std::hex << it->first << std::dec << std::endl;
else
grouter_debug() << " Removing too-old cached item " << std::hex << it->first << std::dec << std::endl;
#endif
GRouterClientService *client = NULL ;
GRouterServiceId service_id = 0;
uint32_t status = (it->second.data_status == RS_GROUTER_DATA_STATUS_RECEIPT_OK)?GROUTER_CLIENT_SERVICE_DATA_STATUS_RECEIVED:GROUTER_CLIENT_SERVICE_DATA_STATUS_FAILED ;
if(!locked_getClientAndServiceId(it->second.tunnel_hash,it->second.data_item->destination_key,client,service_id))
std::cerr << " ERROR: cannot find client for cancelled message " << it->first << std::endl;
else
notified_msgs[it->first] = std::make_pair(client,status) ;
delete it->second.data_item ;
if(it->second.receipt_item != NULL)
delete it->second.receipt_item ;
std::map<GRouterMsgPropagationId,GRouterRoutingInfo>::iterator tmp(it) ;
++tmp ;
_pending_messages.erase(it) ;
it = tmp ;
}
else
++it ;
}
// look into pending items.
#ifdef GROUTER_DEBUG
grouter_debug() << " Pending messages to route : " << _pending_messages.size() << std::endl;
#endif
for(std::map<GRouterMsgPropagationId,std::pair<GRouterClientService*,uint32_t> >::const_iterator it(notified_msgs.begin());it!=notified_msgs.end();++it)
it->second.first->notifyDataStatus(it->first, it->second.second) ;
}
bool p3GRouter::sendDataInTunnel(const TurtleVirtualPeerId& vpid,RsGRouterAbstractMsgItem *item)
@ -1051,13 +1053,10 @@ void p3GRouter::handleIncomingReceiptItem(const TurtleFileHash& hash,RsGRouterSi
std::cerr << " removing messsage from cache." << std::endl;
#endif
delete it->second.data_item ;
//delete it->second.receipt_item ;
_pending_messages.erase(it) ;
it->second.data_status = RS_GROUTER_DATA_STATUS_RECEIPT_OK;
it->second.receipt_item = receipt_item->duplicate() ;
changed = true ;
//it->second.receipt_item = signed_receipt_item ;
}
#ifdef GROUTER_DEBUG
std::cerr << " notifying client that the msg was received." << std::endl;
@ -1065,25 +1064,6 @@ void p3GRouter::handleIncomingReceiptItem(const TurtleFileHash& hash,RsGRouterSi
if(changed)
IndicateConfigChanged() ;
GRouterClientService *client = NULL ;
GRouterServiceId service_id = 0;
{
RS_STACK_MUTEX (grMtx) ;
if(!locked_getClientAndServiceId(hash,receipt_item->signature.keyId,client,service_id))
{
std::cerr << " ERROR: cannot find client service for this hash/key combination." << std::endl;
return ;
}
}
#ifdef GROUTER_DEBUG
std::cerr << " retrieved client " << (void*)client << ", service_id=" << std::hex << service_id << std::dec << std::endl;
std::cerr << " acknowledging client for data received" << std::endl;
#endif
client->notifyDataStatus(receipt_item->routing_id,GROUTER_CLIENT_SERVICE_DATA_STATUS_RECEIVED) ;
}
void p3GRouter::handleIncomingDataItem(const TurtleFileHash& hash,RsGRouterGenericDataItem *generic_item)
@ -1485,6 +1465,7 @@ bool p3GRouter::sendData(const RsGxsId& destination,const GRouterServiceId& clie
time_t now = time(NULL) ;
info.data_item = data_item ;
info.receipt_item = NULL ;
info.data_status = RS_GROUTER_DATA_STATUS_PENDING ;
info.tunnel_status = RS_GROUTER_TUNNEL_STATUS_UNMANAGED ;
info.last_sent_TS = 0 ;
@ -1631,7 +1612,7 @@ bool p3GRouter::getRoutingMatrixInfo(RsGRouter::GRouterRoutingMatrixInfo& info)
std::set<RsPeerId> ids ;
mServiceControl->getPeersConnected(getServiceInfo().mServiceType,ids) ;
//info.published_keys = _owned_key_ids ;
info.published_keys = _owned_key_ids ;
for(std::set<RsPeerId>::const_iterator it(ids.begin());it!=ids.end();++it)
info.friend_ids.push_back(*it) ;
@ -1650,22 +1631,29 @@ bool p3GRouter::getRoutingMatrixInfo(RsGRouter::GRouterRoutingMatrixInfo& info)
}
bool p3GRouter::getRoutingCacheInfo(std::vector<GRouterRoutingCacheInfo>& infos)
{
RS_STACK_MUTEX(grMtx) ;
RS_STACK_MUTEX(grMtx) ;
infos.clear() ;
for(std::map<GRouterMsgPropagationId,GRouterRoutingInfo>::const_iterator it(_pending_messages.begin());it!=_pending_messages.end();++it)
{
infos.push_back(GRouterRoutingCacheInfo()) ;
GRouterRoutingCacheInfo& cinfo(infos.back()) ;
for(std::map<GRouterMsgPropagationId,GRouterRoutingInfo>::const_iterator it(_pending_messages.begin());it!=_pending_messages.end();++it)
{
GRouterRoutingCacheInfo cinfo ;
cinfo.mid = it->first ;
cinfo.mid = it->first ;
cinfo.local_origin.clear() ; // not used before we implement proxys
cinfo.destination = it->second.data_item->destination_key ;
cinfo.time_stamp = it->second.received_time_TS ;
cinfo.status = it->second.data_status;
cinfo.data_size = it->second.data_item->data_size ;
}
return true ;
cinfo.routing_time = it->second.received_time_TS ;
cinfo.last_tunnel_attempt_time = it->second.last_tunnel_request_TS ;
cinfo.last_sent_time = it->second.last_sent_TS ;
cinfo.receipt_available = (it->second.receipt_item != NULL);
cinfo.data_status = it->second.data_status ;
cinfo.tunnel_status = it->second.tunnel_status ;
cinfo.data_size = it->second.data_item->data_size ;
cinfo.data_hash = RsDirUtil::sha1sum(it->second.data_item->data_bytes,it->second.data_item->data_size) ;
infos.push_back(cinfo) ;
}
return true ;
}
// Dump everything

View file

@ -174,7 +174,6 @@ protected:
//===================================================//
// Calls
// - autoWash()
// - packet handling methods
// - matrix updates
//
@ -205,7 +204,6 @@ private:
return _debug_enabled?(std::cerr):null;
}
void autoWash() ;
void routePendingObjects() ;
void handleTunnels() ;