fixed a few bugs in global router, eliminated duplicate messages, improved routing logic

git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@8173 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
csoler 2015-04-26 19:10:46 +00:00
parent 67ced81660
commit ee2bb70b0e
2 changed files with 215 additions and 139 deletions

View File

@ -47,6 +47,7 @@ static const float RS_GROUTER_BASE_WEIGHT_GXS_PACKET = 0.1f ; // base c
static const uint32_t MAX_TUNNEL_WAIT_TIME = 60 ; // wait for 60 seconds at most for a tunnel response.
static const uint32_t MAX_TUNNEL_UNMANAGED_TIME = 600 ; // min time before retry tunnels for that msg.
static const uint32_t MAX_DELAY_FOR_RESEND = 2*86400+300 ; // re-send if held for more than 2 days (cache store period) plus security delay.
static const uint32_t MAX_DESTINATION_KEEP_TIME = 20*86400 ; // keep for 20 days in destination cache to avoid re-
static const uint32_t TUNNEL_OK_WAIT_TIME = 2 ; // wait for 2 seconds after last tunnel ok, so that we have a complete set of tunnels.
static const uint32_t MAX_GROUTER_DATA_SIZE = 2*1024*1024 ; // 2MB size limit. This is of course arbitrary.
static const uint32_t MAX_TRANSACTION_ACK_WAITING_TIME = 60 ; // wait for at most 60 secs for a ACK. If not restart the transaction.

View File

@ -200,8 +200,6 @@
//#define GROUTER_DEBUG
/**********************/
const std::string p3GRouter::SERVICE_INFO_APP_NAME = "Global Router" ;
p3GRouter::p3GRouter(p3ServiceControl *sc, RsGixs *is)
@ -840,6 +838,7 @@ void p3GRouter::routePendingObjects()
if(!_pending_messages.empty())
std::cerr << "p3GRouter::routePendingObjects()" << std::endl;
#endif
bool pending_messages_changed = false ;
for(std::map<GRouterMsgPropagationId, GRouterRoutingInfo>::iterator it=_pending_messages.begin();it!=_pending_messages.end();++it)
{
@ -847,58 +846,60 @@ void p3GRouter::routePendingObjects()
std::cerr << " message " << std::hex << it->first << std::dec << std::endl;
#endif
if(it->second.data_status == RS_GROUTER_DATA_STATUS_PENDING)
{
// Look for tunnels and friends where to send the data. Send to both.
std::list<RsPeerId> peers ;
if(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_TUNNELS)
locked_collectAvailableTunnels(it->second.tunnel_hash,peers);
// For now, disable friends. We'll first check that the good old tunnel system works as before.
if(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_FRIENDS)
locked_collectAvailableFriends(it->second.data_item->destination_key,peers, it->second.incoming_routes.ids, it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_IS_ORIGIN);
if(peers.empty())
{
#ifdef GROUTER_DEBUG
std::cerr << " no direct friends available" << std::endl;
#endif
// Look for tunnels and friends where to send the data. Send to both.
if(it->second.received_time_TS + DIRECT_FRIEND_TRY_DELAY < now && !(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_TUNNELS))
std::list<RsPeerId> peers ;
if(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_TUNNELS)
locked_collectAvailableTunnels(it->second.tunnel_hash,peers);
// For now, disable friends. We'll first check that the good old tunnel system works as before.
if(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_FRIENDS)
locked_collectAvailableFriends(it->second.data_item->destination_key,peers, it->second.incoming_routes.ids, it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_IS_ORIGIN);
if(peers.empty())
{
#ifdef GROUTER_DEBUG
std::cerr << " enabling tunnels for this message." << std::endl;
std::cerr << " no direct friends available" << std::endl;
#endif
it->second.routing_flags |= GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_TUNNELS ;
if(it->second.received_time_TS + DIRECT_FRIEND_TRY_DELAY < now && !(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_TUNNELS))
{
#ifdef GROUTER_DEBUG
std::cerr << " enabling tunnels for this message." << std::endl;
#endif
it->second.routing_flags |= GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_TUNNELS ;
}
continue ;
}
continue ;
}
// slice the data appropriately and send.
// slice the data appropriately and send.
std::list<RsGRouterTransactionChunkItem*> chunks ;
sliceDataItem(it->second.data_item,chunks) ;
std::list<RsGRouterTransactionChunkItem*> chunks ;
sliceDataItem(it->second.data_item,chunks) ;
#ifdef GROUTER_DEBUG
if(!peers.empty())
std::cerr << " sending to peers:" << std::endl;
if(!peers.empty())
std::cerr << " sending to peers:" << std::endl;
#endif
for(std::list<RsPeerId>::const_iterator itpid(peers.begin());itpid!=peers.end();++itpid)
for(std::list<RsGRouterTransactionChunkItem*>::const_iterator it2(chunks.begin());it2!=chunks.end();++it2)
locked_sendTransactionData(*itpid,*(*it2) ) ;
for(std::list<RsPeerId>::const_iterator itpid(peers.begin());itpid!=peers.end();++itpid)
for(std::list<RsGRouterTransactionChunkItem*>::const_iterator it2(chunks.begin());it2!=chunks.end();++it2)
locked_sendTransactionData(*itpid,*(*it2) ) ;
// delete temporary items
// delete temporary items
for(std::list<RsGRouterTransactionChunkItem*>::const_iterator cit=chunks.begin();cit!=chunks.end();++cit)
delete *cit;
for(std::list<RsGRouterTransactionChunkItem*>::const_iterator cit=chunks.begin();cit!=chunks.end();++cit)
delete *cit;
// change item state in waiting list
// change item state in waiting list
it->second.data_status = RS_GROUTER_DATA_STATUS_ONGOING ;
it->second.data_transaction_TS = now ;
}
it->second.data_status = RS_GROUTER_DATA_STATUS_ONGOING ;
it->second.data_transaction_TS = now ;
pending_messages_changed = true ;
}
else if(it->second.data_status == RS_GROUTER_DATA_STATUS_ONGOING && now > MAX_TRANSACTION_ACK_WAITING_TIME + it->second.data_transaction_TS)
{
#ifdef GROUTER_DEBUG
@ -908,23 +909,27 @@ void p3GRouter::routePendingObjects()
it->second.data_status = RS_GROUTER_DATA_STATUS_PENDING ;
}
else if(it->second.data_status == RS_GROUTER_DATA_STATUS_SENT)
{
if( (it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_IS_ORIGIN) && it->second.last_sent_TS + MAX_DELAY_FOR_RESEND < now)
{
if( (it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_IS_ORIGIN) && it->second.last_sent_TS + MAX_DELAY_FOR_RESEND < now)
{
#ifdef GROUTER_DEBUG
std::cerr << " item was not received. Re-setting status to PENDING" << std::endl;
std::cerr << " item was not received. Re-setting status to PENDING" << std::endl;
#endif
it->second.data_status = RS_GROUTER_DATA_STATUS_PENDING ;
}
else
{
it->second.data_status = RS_GROUTER_DATA_STATUS_PENDING ;
}
else
{
#ifdef GROUTER_DEBUG
std::cerr << " item was sent. Desactivating tunnels." << std::endl;
std::cerr << " item was sent. Desactivating tunnels." << std::endl;
#endif
it->second.routing_flags &= ~GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_TUNNELS ;
it->second.routing_flags &= ~GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_TUNNELS ;
}
}
}
else if(it->second.data_status == RS_GROUTER_DATA_STATUS_RECEIPT_OK)
// We treat this case apart, so as to make sure that receipt items are always forwarded wen possible even if the data_status
// is not set correctly.
if(it->second.receipt_item != NULL && !it->second.incoming_routes.ids.empty())
{
// send the receipt through all incoming routes, as soon as it gets delivered.
@ -933,7 +938,6 @@ void p3GRouter::routePendingObjects()
#endif
std::list<RsGRouterTransactionChunkItem*> chunks ;
sliceDataItem(it->second.receipt_item,chunks) ;
for(std::set<RsPeerId>::iterator it2=it->second.incoming_routes.ids.begin();it2!=it->second.incoming_routes.ids.end();)
if(mServiceControl->isPeerConnected(getServiceInfo().mServiceType,*it2) || mTurtle->isTurtlePeer(*it2))
@ -941,6 +945,8 @@ void p3GRouter::routePendingObjects()
#ifdef GROUTER_DEBUG
std::cerr << " sending receipt back to " << *it2 << " which is online." << std::endl;
#endif
if(chunks.empty())
sliceDataItem(it->second.receipt_item,chunks) ;
for(std::list<RsGRouterTransactionChunkItem*>::const_iterator it3(chunks.begin());it3!=chunks.end();++it3)
locked_sendTransactionData(*it2,*(*it3) ) ;
@ -950,6 +956,8 @@ void p3GRouter::routePendingObjects()
++it2tmp ;
it->second.incoming_routes.ids.erase(it2) ;
it2 = it2tmp ;
pending_messages_changed = true ;
}
else
++it2 ;
@ -965,6 +973,9 @@ void p3GRouter::routePendingObjects()
it->second.data_status = RS_GROUTER_DATA_STATUS_DONE ;
}
}
if(pending_messages_changed)
IndicateConfigChanged() ;
}
void p3GRouter::locked_collectAvailableFriends(const GRouterKeyId& gxs_id,std::list<RsPeerId>& friend_peers,const std::set<RsPeerId>& incoming_routes,bool is_origin)
@ -1126,12 +1137,16 @@ void p3GRouter::autoWash()
RS_STACK_MUTEX(grMtx) ;
for(std::map<GRouterMsgPropagationId, GRouterRoutingInfo>::iterator it=_pending_messages.begin();it!=_pending_messages.end();)
if( it->second.data_status == RS_GROUTER_DATA_STATUS_DONE
|| ((it->second.received_time_TS + GROUTER_ITEM_MAX_CACHE_KEEP_TIME < now)
&& !(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_IS_ORIGIN))) // is the item too old for cache
if( (it->second.data_status == RS_GROUTER_DATA_STATUS_DONE &&
(!(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_IS_DESTINATION)
|| it->second.received_time_TS + MAX_DESTINATION_KEEP_TIME < now))
|| ((it->second.received_time_TS + GROUTER_ITEM_MAX_CACHE_KEEP_TIME < now)
&& !(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_IS_ORIGIN)
&& !(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_IS_DESTINATION)
)) // is the item too old for cache
{
#ifdef GROUTER_DEBUG
grouter_debug() << " Removing too-old cached item " << std::hex << it->first << std::dec << std::endl;
grouter_debug() << " Removing cached item " << std::hex << it->first << std::dec << std::endl;
#endif
GRouterClientService *client = NULL ;
GRouterServiceId service_id = 0;
@ -1145,6 +1160,7 @@ void p3GRouter::autoWash()
}
delete it->second.data_item ;
if(it->second.receipt_item != NULL)
delete it->second.receipt_item ;
@ -1423,12 +1439,29 @@ void p3GRouter::handleIncomingDataItem(RsGRouterGenericDataItem *data_item)
data_item->print(std::cerr,2) ;
#endif
// we find 3 things:
// A - is the item for us ?
// B - signature and hash check ?
// C - item is already known ?
// Store the item? if !C
// Send a receipt? if A && B
// Notify client? if A && !C
//
GRouterClientService *client = NULL ;
GRouterServiceId service_id = data_item->service_id ;
RsGRouterSignedReceiptItem *receipt_item = NULL ;
bool item_is_for_us = false ;
// Find client and service ID from destination key.
Sha1CheckSum item_hash = computeDataItemHash(data_item) ;
bool item_is_already_known = false ;
bool item_is_for_us = false ;
bool cache_has_changed = false ;
// A - Find client and service ID from destination key.
#ifdef GROUTER_DEBUG
std::cerr << " step A: find if the item is for us or not, and whether it's aready in cache or not." << std::endl;
#endif
{
RS_STACK_MUTEX(grMtx) ;
@ -1436,7 +1469,7 @@ void p3GRouter::handleIncomingDataItem(RsGRouterGenericDataItem *data_item)
if(its == _registered_services.end())
{
std::cerr << " ERROR: client id " << service_id << " not registered. Consistency error." << std::endl;
std::cerr << " ERROR: client id " << service_id << " not registered. Consistency error." << std::endl;
return ;
}
client = its->second ;
@ -1444,126 +1477,170 @@ void p3GRouter::handleIncomingDataItem(RsGRouterGenericDataItem *data_item)
// also check wether this item is for us or not
item_is_for_us = _owned_key_ids.find( makeTunnelHash(data_item->destination_key,service_id) ) != _owned_key_ids.end() ;
}
if(item_is_for_us)
#ifdef GROUTER_DEBUG
std::cerr << " item is " << (item_is_for_us?"":"not") << " for us." << std::endl;
#endif
std::map<GRouterMsgPropagationId,GRouterRoutingInfo>::iterator it = _pending_messages.find(data_item->routing_id) ;
if(it != _pending_messages.end())
{
#ifdef GROUTER_DEBUG
std::cerr << " item is for us. Checking signature." << std::endl;
#endif
if(verifySignedDataItem(data_item)) // we should get proper flags out of this
if(it->second.item_hash != item_hash)
{
#ifdef GROUTER_DEBUG
std::cerr << " verifying item signature: CHECKED!" ;
std::cerr << " ERROR: item is already known but data hash does not match. Dropping that item." << std::endl;
#endif
}
#ifdef GROUTER_DEBUG
else
std::cerr << " verifying item signature: FAILED!" ;
#endif
// compute the hash before decryption.
Sha1CheckSum data_hash = computeDataItemHash(data_item) ;
if(!decryptDataItem(data_item))
{
std::cerr << " decrypting item : FAILED! Item will be dropped." << std::endl;
return ;
}
item_is_already_known = true ;
receipt_item = it->second.receipt_item ;
#ifdef GROUTER_DEBUG
std::cerr << " item is already in cache." << std::endl;
#endif
}
#ifdef GROUTER_DEBUG
else
std::cerr << " item is new." << std::endl;
#endif
}
// At this point, if item is already known, it is guarrantied to be identical to the stored item.
// If the item is for us, and not already known, check the signature and hash, and generate a signed receipt
if(item_is_for_us && !item_is_already_known)
{
#ifdef GROUTER_DEBUG
std::cerr << " step B: item is for us and is new, so make sure it's authentic and create a receipt" << std::endl;
#endif
if(!verifySignedDataItem(data_item)) // we should get proper flags out of this
{
#ifdef GROUTER_DEBUG
std::cerr << " verifying item signature: FAILED! Droping that item" ;
#endif
return ;
}
#ifdef GROUTER_DEBUG
else
std::cerr << " decrypting item : OK!" << std::endl;
std::cerr << " verifying item signature: CHECKED!" ;
#endif
// make a copy of the data, since the item will be deleted.
uint8_t *data_copy = (uint8_t*)malloc(data_item->data_size) ;
memcpy(data_copy,data_item->data_bytes,data_item->data_size) ;
client->receiveGRouterData(data_item->destination_key,data_item->signature.keyId,service_id,data_copy,data_item->data_size);
// No we need to send a signed receipt to the sender.
receipt_item = new RsGRouterSignedReceiptItem;
receipt_item->data_hash = data_hash ;
receipt_item->data_hash = item_hash ;
receipt_item->routing_id = data_item->routing_id ;
receipt_item->destination_key = data_item->signature.keyId ;
receipt_item->flags = 0 ;
#ifdef GROUTER_DEBUG
std::cerr << " preparing signed receipt." << std::endl;
std::cerr << " preparing signed receipt." << std::endl;
#endif
if(!signDataItem(receipt_item,data_item->destination_key))
{
std::cerr << " signing: FAILED. Receipt dropped. ERROR." << std::endl;
std::cerr << " signing: FAILED. Receipt dropped. ERROR. Packet dropped as well." << std::endl;
return ;
}
#ifdef GROUTER_DEBUG
std::cerr << " signing: OK." << std::endl;
std::cerr << " signing: OK." << std::endl;
#endif
}
#ifdef GROUTER_DEBUG
else
std::cerr << " item is not for us. Storing/forwarding." << std::endl;
std::cerr << " step B: skipped, since item is not for us, or already known." << std::endl;
#endif
// locally store the item, even if we're the destination
RS_STACK_MUTEX(grMtx) ;
// item is not for us. We store it in pending messages and will deal with it later
std::map<GRouterMsgPropagationId,GRouterRoutingInfo>::iterator it = _pending_messages.find(data_item->routing_id) ;
Sha1CheckSum item_hash = computeDataItemHash(data_item) ;
if(it == _pending_messages.end())
{
// now store the item in _pending_messages whether it is for us or not (if the item is for us, this prevents receiving multiple
// copies of the message)
#ifdef GROUTER_DEBUG
std::cerr << " item is new. Storing it and forwarding it." << std::endl;
std::cerr << " step C: store the item is cache." << std::endl;
#endif
{
RS_STACK_MUTEX(grMtx) ;
GRouterRoutingInfo& info(_pending_messages[data_item->routing_id]) ;
info.data_item = data_item->duplicate() ;
info.receipt_item = receipt_item ; // inited before, or NULL.
info.tunnel_status = RS_GROUTER_TUNNEL_STATUS_UNMANAGED ;
info.last_sent_TS = 0 ;
info.item_hash = item_hash ;
info.last_tunnel_request_TS = 0 ;
info.sending_attempts = 0 ;
info.received_time_TS = time(NULL) ;
info.tunnel_hash = makeTunnelHash(data_item->destination_key,data_item->service_id) ;
if(item_is_for_us)
if(info.data_item == NULL) // item is not for us
{
info.routing_flags = GRouterRoutingInfo::ROUTING_FLAGS_IS_DESTINATION | GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_FRIENDS ;
info.data_status = RS_GROUTER_DATA_STATUS_DONE ;
}
else
{
info.routing_flags = GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_FRIENDS ; // don't allow tunnels just yet
info.data_status = RS_GROUTER_DATA_STATUS_PENDING ;
}
}
else if(item_hash != it->second.item_hash) // make sure that the received item is identical to the one that is being saved already.
{
std::cerr << " (EE) reveived an item with known ID but different data hash. Is that an attack?? Item will be dropped." << std::endl;
return ;
}
else
receipt_item = it->second.receipt_item ;
#ifdef GROUTER_DEBUG
std::cerr << " storing incoming route: from " << data_item->PeerId() << std::endl;
std::cerr << " item is new. Storing it." << std::endl;
#endif
_pending_messages[data_item->routing_id].incoming_routes.ids.insert(data_item->PeerId()) ;
info.data_item = data_item->duplicate() ;
info.receipt_item = receipt_item ; // inited before, or NULL.
info.tunnel_status = RS_GROUTER_TUNNEL_STATUS_UNMANAGED ;
info.last_sent_TS = 0 ;
info.item_hash = item_hash ;
info.last_tunnel_request_TS = 0 ;
info.sending_attempts = 0 ;
info.received_time_TS = time(NULL) ;
info.tunnel_hash = makeTunnelHash(data_item->destination_key,data_item->service_id) ;
if(receipt_item != NULL)
_pending_messages[data_item->routing_id].data_status = RS_GROUTER_DATA_STATUS_RECEIPT_OK ;
if(item_is_for_us)
{
// don't store if item is for us. No need to take that much memory.
free(info.data_item->data_bytes) ;
info.data_item->data_size = 0 ;
info.data_item->data_bytes = NULL ;
info.routing_flags = GRouterRoutingInfo::ROUTING_FLAGS_IS_DESTINATION | GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_FRIENDS ;
info.data_status = RS_GROUTER_DATA_STATUS_RECEIPT_OK ;
}
else
{
info.routing_flags = GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_FRIENDS ; // don't allow tunnels just yet
info.data_status = RS_GROUTER_DATA_STATUS_PENDING ;
}
}
#ifdef GROUTER_DEBUG
else
std::cerr << " item is already in cache." << std::endl;
std::cerr << " storing incoming route from " << data_item->PeerId() << std::endl;
#endif
info.incoming_routes.ids.insert(data_item->PeerId()) ;
cache_has_changed = true ;
}
// if the item is for us and is not already known, notify the client.
if(item_is_for_us && !item_is_already_known)
{
// compute the hash before decryption.
#ifdef GROUTER_DEBUG
std::cerr << " step D: item is for us and is new: decrypting and notifying client." << std::endl;
#endif
RsGRouterGenericDataItem *decrypted_item = data_item->duplicate() ;
if(!decryptDataItem(decrypted_item))
{
std::cerr << " decrypting item : FAILED! Item cannot be passed to the client." << std::endl;
delete decrypted_item ;
return ;
}
#ifdef GROUTER_DEBUG
else
std::cerr << " decrypting item : OK!" << std::endl;
std::cerr << " notyfying client." << std::endl;
#endif
client->receiveGRouterData(decrypted_item->destination_key,decrypted_item->signature.keyId,service_id,decrypted_item->data_bytes,decrypted_item->data_size);
decrypted_item->data_bytes = NULL ;
decrypted_item->data_size = 0 ;
delete decrypted_item ;
}
#ifdef GROUTER_DEBUG
else
std::cerr << " step D: item is not for us or not new: skipping this step." << std::endl;
#endif
if(cache_has_changed)
IndicateConfigChanged() ;
}
bool p3GRouter::locked_getClientAndServiceId(const TurtleFileHash& hash, const RsGxsId& destination_key, GRouterClientService *& client, GRouterServiceId& service_id)
@ -1676,8 +1753,6 @@ bool p3GRouter::decryptDataItem(RsGRouterGenericDataItem *item)
bool p3GRouter::signDataItem(RsGRouterAbstractMsgItem *item,const RsGxsId& signing_id)
{
uint8_t *data = NULL;
try
{
RsTlvSecurityKey signature_key ;
@ -2110,8 +2185,8 @@ void p3GRouter::debugDump()
grouter_debug() << " Routing matrix: " << std::endl;
if(_debug_enabled)
_routing_matrix.debugDump() ;
// if(_debug_enabled)
// _routing_matrix.debugDump() ;
}