saving ongoing work on global router. Fixed friend-to-friend message passing.

git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@8128 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
csoler 2015-04-12 21:43:52 +00:00
parent 2ecd582273
commit 6b841ba4e1
4 changed files with 128 additions and 41 deletions

View File

@ -211,6 +211,9 @@ RsGRouterRoutingInfoItem *RsGRouterSerialiser::deserialise_RsGRouterRoutingInfoI
ok &= getRawUInt32(data, pktsize, &offset, &item->client_id);
ok &= item->tunnel_hash.deserialise(data, pktsize, offset) ;
ok &= getRawUInt32(data, pktsize, &offset, &item->routing_flags) ;
ok &= item->incoming_routes.GetTlv(data,pktsize,&offset) ;
item->data_item = deserialise_RsGRouterGenericDataItem(&((uint8_t*)data)[offset],pktsize - offset) ;
if(item->data_item != NULL)
@ -252,7 +255,7 @@ RsGRouterMatrixFriendListItem *RsGRouterSerialiser::deserialise_RsGRouterMatrixF
uint32_t nb_friends = 0 ;
ok &= getRawUInt32(data, pktsize, &offset, &nb_friends); // file hash
item->reverse_friend_indices.resize(nb_friends) ;
item->reverse_friend_indices.resize(nb_friends) ;
for(uint32_t i=0;ok && i<nb_friends;++i)
ok &= item->reverse_friend_indices[i].deserialise(data, pktsize, offset) ;
@ -573,6 +576,9 @@ uint32_t RsGRouterRoutingInfoItem::serial_size() const
s += sizeof(GRouterServiceId) ; // service_id
s += tunnel_hash.serial_size() ;
s += 4 ; // routing_flags
s += incoming_routes.TlvSize() ; // incoming_routes
s += data_item->serial_size(); // data_item
if(receipt_item != NULL)
@ -668,6 +674,9 @@ bool RsGRouterRoutingInfoItem::serialise(void *data,uint32_t& size) const
ok &= setRawUInt32(data, tlvsize, &offset, client_id) ;
ok &= tunnel_hash.serialise(data, tlvsize, offset) ;
ok &= setRawUInt32(data, tlvsize, &offset, routing_flags) ;
ok &= incoming_routes.SetTlv(data,tlvsize,&offset) ;
uint32_t ns = size - offset ;
ok &= data_item->serialise( &((uint8_t*)data)[offset], ns) ;

View File

@ -48,9 +48,9 @@ const uint8_t RS_PKT_SUBTYPE_GROUTER_MATRIX_CLUES = 0x80 ; // it
const uint8_t RS_PKT_SUBTYPE_GROUTER_FRIENDS_LIST = 0x82 ; // item to save friend lists
const uint8_t RS_PKT_SUBTYPE_GROUTER_ROUTING_INFO_deprecated = 0x87 ; // deprecated. Don't use.
const uint8_t RS_PKT_SUBTYPE_GROUTER_ROUTING_INFO_deprecated2 = 0x88 ; // item to save routing info
const uint8_t RS_PKT_SUBTYPE_GROUTER_ROUTING_INFO = 0x89 ; // deprecated. Don't use.
const uint8_t RS_PKT_SUBTYPE_GROUTER_ROUTING_INFO = 0x90 ; // deprecated. Don't use.
const uint8_t QOS_PRIORITY_RS_GROUTER = 3 ; // relevant for items that travel through friends
const uint8_t QOS_PRIORITY_RS_GROUTER = 4 ; // relevant for items that travel through friends
/***********************************************************************************/

View File

@ -61,6 +61,7 @@ static const uint32_t RS_GROUTER_DATA_STATUS_PENDING = 0x0001 ; // item is
static const uint32_t RS_GROUTER_DATA_STATUS_SENT = 0x0002 ; // item is sent to tunnel or friend. No need to keep sending.
static const uint32_t RS_GROUTER_DATA_STATUS_RECEIPT_OK = 0x0003 ; // item is at destination.
static const uint32_t RS_GROUTER_DATA_STATUS_ONGOING = 0x0004 ; // transaction is ongoing.
static const uint32_t RS_GROUTER_DATA_STATUS_DONE = 0x0005 ; // receipt item has been forward to all routes. We can remove the item.
static const uint32_t RS_GROUTER_SENDING_STATUS_TUNNEL = 0x0001 ; // item was sent in a tunnel
static const uint32_t RS_GROUTER_SENDING_STATUS_FRIEND = 0x0002 ; // item was sent to a friend
@ -104,15 +105,15 @@ public:
GRouterServiceId client_id ; // service ID of the client. Only valid when origin==OwnId
TurtleFileHash tunnel_hash ; // tunnel hash to be used for this item
uint32_t routing_flags ;
RsGRouterGenericDataItem *data_item ;
RsGRouterSignedReceiptItem *receipt_item ;
std::set<RsPeerId> incoming_routes ;
RsTlvPeerIdSet incoming_routes ;
// non serialised data
uint32_t routing_flags ;
time_t data_transaction_TS ;
static const uint32_t ROUTING_FLAGS_ALLOW_TUNNELS = 0x0001;

View File

@ -254,7 +254,7 @@
#include "grouterclientservice.h"
/**********************/
//#define GROUTER_DEBUG
#define GROUTER_DEBUG
/**********************/
static const uint32_t MAX_TUNNEL_WAIT_TIME = 60 ; // wait for 60 seconds at most for a tunnel response.
@ -477,12 +477,14 @@ void p3GRouter::handleLowLevelTransactionChunkItem(RsGRouterTransactionChunkItem
std::cerr << " item is a transaction item." << std::endl;
#endif
RsPeerId pid = chunk_item->PeerId() ;
RsGRouterAbstractMsgItem *generic_item = NULL;
{
RS_STACK_MUTEX(grMtx) ;
generic_item = _incoming_data_pipes[chunk_item->PeerId()].addDataChunk(chunk_item) ; // addDataChunk takes ownership over chunk_item
generic_item->PeerId(chunk_item->PeerId()) ;
generic_item = _incoming_data_pipes[pid].addDataChunk(dynamic_cast<RsGRouterTransactionChunkItem*>(chunk_item->duplicate())) ;// addDataChunk takes ownership over chunk_item
generic_item->PeerId(pid) ;
}
// send to client off-mutex
@ -497,7 +499,7 @@ void p3GRouter::handleLowLevelTransactionChunkItem(RsGRouterTransactionChunkItem
RsGRouterTransactionAcknItem ackn_item ;
ackn_item.propagation_id = generic_item->routing_id ;
locked_sendTransactionData(chunk_item->PeerId(),ackn_item) ;
locked_sendTransactionData(pid,ackn_item) ;
{
RS_STACK_MUTEX(grMtx) ;
@ -526,8 +528,10 @@ void p3GRouter::handleLowLevelTransactionAckItem(RsGRouterTransactionAcknItem *t
std::cerr << " setting new status as sent/awaiting receipt." << std::endl;
#endif
}
#ifdef GROUTER_DEBUG
else
std::cerr << " ERROR: no routing ID corresponds to this ACK item. Inconsistency!" << std::endl;
std::cerr << " Note: no routing ID corresponds to this ACK item. This probably corresponds to a signed receipt" << std::endl;
#endif
}
void p3GRouter::receiveTurtleData(RsTurtleGenericTunnelItem *gitem,const RsFileHash& hash,const RsPeerId& virtual_peer_id,RsTurtleGenericTunnelItem::Direction direction)
@ -723,13 +727,13 @@ void p3GRouter::removeVirtualPeer(const TurtleFileHash& hash,const TurtleVirtual
#endif
_tunnels.erase(it) ;
}
// #ifdef GROUTER_DEBUG
// std::cerr << " setting tunnel status in pending message." << std::endl;
// #endif
//
// for(std::map<GRouterMsgPropagationId,GRouterRoutingInfo>::iterator it2(_pending_messages.begin());it2!=_pending_messages.end();++it2)
// if(it2->second.tunnel_hash == hash && it->second.virtual_peers.empty())
// it2->second.tunnel_status = RS_GROUTER_TUNNEL_STATUS_PENDING ;
#ifdef GROUTER_DEBUG
std::cerr << " setting tunnel status in pending message." << std::endl;
#endif
for(std::map<GRouterMsgPropagationId,GRouterRoutingInfo>::iterator it2(_pending_messages.begin());it2!=_pending_messages.end();++it2)
if(it2->second.tunnel_hash == hash && it->second.virtual_peers.empty())
it2->second.tunnel_status = RS_GROUTER_TUNNEL_STATUS_PENDING ;
}
void p3GRouter::connectToTurtleRouter(p3turtle *pt)
@ -862,12 +866,26 @@ if(!_pending_messages.empty())
else
std::cerr << " doing nothing." << std::endl;
#endif
// also check that all tunnels are actually active, to remove any old dead tunnels
if(it->second.tunnel_status == RS_GROUTER_TUNNEL_STATUS_READY)
{
std::map<TurtleFileHash,GRouterTunnelInfo>::iterator it2 = _tunnels.find(it->second.tunnel_hash) ;
if(it2 == _tunnels.end() || it2->second.virtual_peers.empty()) ;
{
std::cerr << " re-setting tunnel status to PENDING, as no tunnels are actually present." << std::endl;
it->second.tunnel_status = RS_GROUTER_TUNNEL_STATUS_PENDING ;
}
}
}
#ifdef GROUTER_DEBUG
if(!priority_list.empty())
grouter_debug() << " sorting..." << std::endl;
#endif
std::sort(priority_list.begin(),priority_list.end(),item_comparator_001()) ;
// take tunnels from item priority list, and enable tunnel handling, while respecting max number of active tunnels limit
@ -919,13 +937,14 @@ void p3GRouter::routePendingObjects()
std::list<RsPeerId> peers ;
// For now, disable tunnels. We'll first check that the good old tunnel system works as before.
if(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_TUNNELS)
locked_collectAvailableTunnels(it->second.tunnel_hash,peers);
locked_collectAvailableTunnels(it->second.tunnel_hash,peers);
// For now, disable friends. We'll first check that the good old tunnel system works as before.
//
// if(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_FRIENDS)
// locked_collectAvailableFriends(it->second.data_item->destination_key,peers, it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_IS_ORIGIN);
// if(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_FRIENDS)
// locked_collectAvailableFriends(it->second.data_item->destination_key,peers, it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_IS_ORIGIN);
if(peers.empty())
{
@ -942,9 +961,9 @@ void p3GRouter::routePendingObjects()
if(!peers.empty())
std::cerr << " sending to peers:" << std::endl;
#endif
for(std::list<RsPeerId>::const_iterator itpid(peers.begin());itpid!=peers.end();++itpid)
for(std::list<RsGRouterTransactionChunkItem*>::const_iterator it2(chunks.begin());it2!=chunks.end();++it2)
locked_sendTransactionData(*itpid,*(*it2) ) ;
for(std::list<RsPeerId>::const_iterator itpid(peers.begin());itpid!=peers.end();++itpid)
for(std::list<RsGRouterTransactionChunkItem*>::const_iterator it2(chunks.begin());it2!=chunks.end();++it2)
locked_sendTransactionData(*itpid,*(*it2) ) ;
// delete temporary items
@ -956,12 +975,48 @@ void p3GRouter::routePendingObjects()
it->second.data_status = RS_GROUTER_DATA_STATUS_ONGOING ;
it->second.data_transaction_TS = now ; ;
}
else if(it->second.data_status == RS_GROUTER_DATA_STATUS_ONGOING && now > MAX_TRANSACTION_ACK_WAITING_TIME + it->second.data_transaction_TS)
{
std::cerr << " waited too long for this transation. Switching back to PENDING." << std::endl;
else if(it->second.data_status == RS_GROUTER_DATA_STATUS_ONGOING && now > MAX_TRANSACTION_ACK_WAITING_TIME + it->second.data_transaction_TS)
{
std::cerr << " waited too long for this transation. Switching back to PENDING." << std::endl;
it->second.data_status = RS_GROUTER_DATA_STATUS_PENDING ;
}
it->second.data_status = RS_GROUTER_DATA_STATUS_PENDING ;
}
else if(it->second.data_status == RS_GROUTER_DATA_STATUS_RECEIPT_OK)
{
// send the receipt through all incoming routes, as soon as it gets delivered.
std::cerr << " receipt should be sent back. Trying all incoming routes..." << std::endl;
std::list<RsGRouterTransactionChunkItem*> chunks ;
sliceDataItem(it->second.receipt_item,chunks) ;
for(std::list<RsPeerId>::iterator it2=it->second.incoming_routes.ids.begin();it2!=it->second.incoming_routes.ids.end();)
if(mServiceControl->isPeerConnected(getServiceInfo().mServiceType,*it2))
{
std::cerr << " sending receipt back to " << *it2 << " which is online." << std::endl;
for(std::list<RsGRouterTransactionChunkItem*>::const_iterator it3(chunks.begin());it3!=chunks.end();++it3)
locked_sendTransactionData(*it2,*(*it3) ) ;
// then remove from the set.
std::list<RsPeerId>::iterator it2tmp = it2 ;
++it2tmp ;
it->second.incoming_routes.ids.erase(it2) ;
it2 = it2tmp ;
}
else
++it2 ;
for(std::list<RsGRouterTransactionChunkItem*>::const_iterator cit=chunks.begin();cit!=chunks.end();++cit)
delete *cit;
// Because signed receipts are small items, we take the bet that if the item could be sent, then it was received.
// otherwise, we should mark that incomng route as being handled, wait for the ACK and deal with it by updating
// it->second.data_status at that time.
if(it->second.incoming_routes.ids.empty())
it->second.data_status = RS_GROUTER_DATA_STATUS_DONE ;
}
}
}
@ -1087,6 +1142,7 @@ bool p3GRouter::locked_sendTransactionData(const RsPeerId& pid,const RsGRouterTr
std::cerr << " sending to pid " << pid << std::endl;
#endif
RsGRouterTransactionItem *item_copy = trans_item.duplicate() ;
item_copy->PeerId(pid) ;
sendItem(item_copy) ;
@ -1137,15 +1193,32 @@ void p3GRouter::autoWash()
}
else
++it ;
// also check all existing tunnels
for(std::map<TurtleFileHash,GRouterTunnelInfo>::iterator it = _tunnels.begin();it!=_tunnels.end();++it)
{
std::list<TurtleVirtualPeerId> vpids_to_remove ;
for(std::set<TurtleVirtualPeerId>::iterator it2 = it->second.virtual_peers.begin();it2!=it->second.virtual_peers.end();++it2)
if(!mTurtle->isTurtlePeer(*it2))
{
vpids_to_remove.push_back(*it2) ;
std::cerr << " " << *it2 << " is not an active tunnel for hash " << it->first << ". Removing virtual peer id." << std::endl;
}
for(std::list<TurtleVirtualPeerId>::const_iterator it2=vpids_to_remove.begin();it2!=vpids_to_remove.end();++it2)
it->second.removeVirtualPeer(*it2) ;
}
}
// look into pending items.
#ifdef GROUTER_DEBUG
grouter_debug() << " Pending messages to route : " << _pending_messages.size() << std::endl;
#endif
#warning move the notification for received messages in the handlign function of signed receipts. Keep the notification for unsent here.
for(std::map<GRouterMsgPropagationId,std::pair<GRouterClientService*,uint32_t> >::const_iterator it(notified_msgs.begin());it!=notified_msgs.end();++it)
{
std::cerr << " notifying client for message id " << std::hex << it->first << " state = " << it->second.second << std::endl;
it->second.first->notifyDataStatus(it->first, it->second.second) ;
}
#warning should we also clean incoming data pipes?
if(items_deleted)
_changed = true ;
@ -1159,7 +1232,7 @@ bool p3GRouter::sliceDataItem(RsGRouterAbstractMsgItem *item,std::list<RsGRouter
// calling client. In case of error, all allocated memory is deleted.
#ifdef GROUTER_DEBUG
std::cerr << "p3GRouter::sendDataInTunnel()" << std::endl;
std::cerr << "p3GRouter::sliceDataItem()" << std::endl;
std::cerr << "item dump before send:" << std::endl;
item->print(std::cerr, 2) ;
#endif
@ -1362,7 +1435,10 @@ void p3GRouter::handleIncomingDataItem(RsGRouterGenericDataItem *data_item)
}
std::cerr << " storing incoming route: from " << data_item->PeerId() << std::endl;
_pending_messages[data_item->routing_id].incoming_routes.insert(data_item->PeerId()) ;
#warning we should make sure there's no duplicates. Possibly turn RsTlvIdSet.ids into a std::set!
if(!mTurtle->isTurtlePeer(data_item->PeerId()))
_pending_messages[data_item->routing_id].incoming_routes.ids.push_back(data_item->PeerId()) ;
return ;
}
@ -1439,11 +1515,11 @@ void p3GRouter::handleIncomingDataItem(RsGRouterGenericDataItem *data_item)
if(ok)
{
#ifdef GROUTER_DEBUG
std::cerr << " sent signed receipt in tunnel " << data_item->PeerId() << std::endl;
std::cerr << " sent signed receipt to " << data_item->PeerId() << std::endl;
#endif
}
else
std::cerr << " sending signed receipt in tunnel " << data_item->PeerId() << ": FAILED." << std::endl;
std::cerr << " sending signed receipt to " << data_item->PeerId() << ": FAILED." << std::endl;
}
bool p3GRouter::locked_getClientAndServiceId(const TurtleFileHash& hash, const RsGxsId& destination_key, GRouterClientService *& client, GRouterServiceId& service_id)
@ -1737,7 +1813,7 @@ bool p3GRouter::sendData(const RsGxsId& destination,const GRouterServiceId& clie
info.last_friend_sent_TS = 0 ;
info.last_tunnel_request_TS = 0 ;
info.sending_attempts = 0 ;
info.routing_flags = GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_FRIENDS
info.routing_flags = GRouterRoutingInfo::ROUTING_FLAGS_IS_ORIGIN
| GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_TUNNELS
| GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_FRIENDS ;
info.received_time_TS = now ;
@ -1958,6 +2034,7 @@ void p3GRouter::debugDump()
grouter_debug() << " Received : " << now - it->second.received_time_TS << " secs ago.";
grouter_debug() << " Last tunnel sent: " << now - it->second.last_tunnel_sent_TS << " secs ago.";
grouter_debug() << " Last friend sent: " << now - it->second.last_friend_sent_TS << " secs ago.";
grouter_debug() << " Transaction TS : " << now - it->second.data_transaction_TS << " secs ago.";
grouter_debug() << " Data Status : " << statusString[it->second.data_status] << std::endl;
grouter_debug() << " Tunl Status : " << statusString[it->second.tunnel_status] << std::endl;
grouter_debug() << " Receipt ok : " << (it->second.receipt_item != NULL) << std::endl;
@ -1983,8 +2060,8 @@ void p3GRouter::debugDump()
grouter_debug() << " Routing matrix: " << std::endl;
// if(_debug_enabled)
// _routing_matrix.debugDump() ;
if(_debug_enabled)
_routing_matrix.debugDump() ;
}