async distant messaging should work, but needs soem more testing. Use with care

git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@8139 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
csoler 2015-04-17 21:37:26 +00:00
parent c9d5c7b3cb
commit 6b84be37de
4 changed files with 54 additions and 34 deletions

View File

@ -162,9 +162,9 @@ bool GRouterMatrix::computeRoutingProbabilities(const GRouterKeyId& key_id, cons
if(it2 == _time_combined_hits.end()) if(it2 == _time_combined_hits.end())
{ {
// The key is not known. In this case, we return equal probabilities for all peers. // The key is not known. In this case, we return a zero probability for all peers.
// //
float p = 1.0f / friends.size() ; float p = 0.0f;//1.0f / friends.size() ;
probas.clear() ; probas.clear() ;
probas.resize(friends.size(),p) ; probas.resize(friends.size(),p) ;

View File

@ -118,6 +118,6 @@ public:
static const uint32_t ROUTING_FLAGS_ALLOW_TUNNELS = 0x0001; static const uint32_t ROUTING_FLAGS_ALLOW_TUNNELS = 0x0001;
static const uint32_t ROUTING_FLAGS_ALLOW_FRIENDS = 0x0002; static const uint32_t ROUTING_FLAGS_ALLOW_FRIENDS = 0x0002;
static const uint32_t ROUTING_FLAGS_IS_ORIGIN = 0x0003; static const uint32_t ROUTING_FLAGS_IS_ORIGIN = 0x0004;
}; };

View File

@ -254,7 +254,7 @@
#include "grouterclientservice.h" #include "grouterclientservice.h"
/**********************/ /**********************/
//#define GROUTER_DEBUG #define GROUTER_DEBUG
/**********************/ /**********************/
static const uint32_t MAX_TUNNEL_WAIT_TIME = 60 ; // wait for 60 seconds at most for a tunnel response. static const uint32_t MAX_TUNNEL_WAIT_TIME = 60 ; // wait for 60 seconds at most for a tunnel response.
@ -264,6 +264,7 @@ static const uint32_t TUNNEL_OK_WAIT_TIME = 3 ; // wait
static const uint32_t MAX_GROUTER_DATA_SIZE = 2*1024*1024 ; // 2MB size limit. This is of course arbitrary. static const uint32_t MAX_GROUTER_DATA_SIZE = 2*1024*1024 ; // 2MB size limit. This is of course arbitrary.
static const uint32_t MAX_RECEIPT_WAIT_TIME = 20 ; // wait for at most 20 secs for a receipt. If not, cancel. static const uint32_t MAX_RECEIPT_WAIT_TIME = 20 ; // wait for at most 20 secs for a receipt. If not, cancel.
static const uint32_t MAX_TRANSACTION_ACK_WAITING_TIME = 60 ; // wait for at most 60 secs for a ACK. If not restart the transaction. static const uint32_t MAX_TRANSACTION_ACK_WAITING_TIME = 60 ; // wait for at most 60 secs for a ACK. If not restart the transaction.
static const uint32_t DIRECT_FRIEND_TRY_DELAY = 20 ; // wait for 20 secs if no friends available, then try tunnels.
const std::string p3GRouter::SERVICE_INFO_APP_NAME = "Global Router" ; const std::string p3GRouter::SERVICE_INFO_APP_NAME = "Global Router" ;
@ -852,15 +853,16 @@ if(!_pending_messages.empty())
it->second.tunnel_status = RS_GROUTER_TUNNEL_STATUS_UNMANAGED ; it->second.tunnel_status = RS_GROUTER_TUNNEL_STATUS_UNMANAGED ;
} }
else if(it->second.data_status == RS_GROUTER_DATA_STATUS_SENT && it->second.last_tunnel_sent_TS + MAX_RECEIPT_WAIT_TIME < now) else if(it->second.data_status == RS_GROUTER_DATA_STATUS_SENT)// && it->second.last_tunnel_sent_TS + MAX_RECEIPT_WAIT_TIME < now)
{ {
#ifdef GROUTER_DEBUG #ifdef GROUTER_DEBUG
std::cerr << " closing pending tunnels." << std::endl; std::cerr << " closing pending tunnels." << std::endl;
#endif #endif
mTurtle->stopMonitoringTunnels(it->second.tunnel_hash) ; mTurtle->stopMonitoringTunnels(it->second.tunnel_hash) ;
it->second.routing_flags &= ~GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_TUNNELS ;
it->second.tunnel_status = RS_GROUTER_TUNNEL_STATUS_UNMANAGED ; //it->second.tunnel_status = RS_GROUTER_TUNNEL_STATUS_UNMANAGED ;
it->second.data_status = RS_GROUTER_DATA_STATUS_PENDING ; //it->second.data_status = RS_GROUTER_DATA_STATUS_PENDING ;
} }
#ifdef GROUTER_DEBUG #ifdef GROUTER_DEBUG
else else
@ -937,25 +939,31 @@ void p3GRouter::routePendingObjects()
std::list<RsPeerId> peers ; std::list<RsPeerId> peers ;
// For now, disable tunnels. We'll first check that the good old tunnel system works as before. if(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_TUNNELS)
if(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_TUNNELS) locked_collectAvailableTunnels(it->second.tunnel_hash,peers);
locked_collectAvailableTunnels(it->second.tunnel_hash,peers);
// For now, disable friends. We'll first check that the good old tunnel system works as before. // For now, disable friends. We'll first check that the good old tunnel system works as before.
//if(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_FRIENDS) if(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_FRIENDS)
// locked_collectAvailableFriends(it->second.data_item->destination_key,peers, it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_IS_ORIGIN); locked_collectAvailableFriends(it->second.data_item->destination_key,peers, it->second.incoming_routes.ids, it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_IS_ORIGIN);
if(peers.empty()) if(peers.empty())
{
std::cerr << " no direct friends available" << std::endl;
if(it->second.received_time_TS + DIRECT_FRIEND_TRY_DELAY < now &&
!(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_TUNNELS))
{ {
std::cerr << " no tunnel nor friends available" << std::endl; std::cerr << " enabling tunnels for this message." << std::endl;
continue ; it->second.routing_flags |= GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_TUNNELS ;
} }
continue ;
}
// slice the data appropriately and send. // slice the data appropriately and send.
std::list<RsGRouterTransactionChunkItem*> chunks ; std::list<RsGRouterTransactionChunkItem*> chunks ;
sliceDataItem(it->second.data_item,chunks) ; sliceDataItem(it->second.data_item,chunks) ;
#ifdef GROUTER_DEBUG #ifdef GROUTER_DEBUG
if(!peers.empty()) if(!peers.empty())
@ -990,7 +998,7 @@ void p3GRouter::routePendingObjects()
std::list<RsGRouterTransactionChunkItem*> chunks ; std::list<RsGRouterTransactionChunkItem*> chunks ;
sliceDataItem(it->second.receipt_item,chunks) ; sliceDataItem(it->second.receipt_item,chunks) ;
for(std::list<RsPeerId>::iterator it2=it->second.incoming_routes.ids.begin();it2!=it->second.incoming_routes.ids.end();) for(std::set<RsPeerId>::iterator it2=it->second.incoming_routes.ids.begin();it2!=it->second.incoming_routes.ids.end();)
if(mServiceControl->isPeerConnected(getServiceInfo().mServiceType,*it2)) if(mServiceControl->isPeerConnected(getServiceInfo().mServiceType,*it2))
{ {
std::cerr << " sending receipt back to " << *it2 << " which is online." << std::endl; std::cerr << " sending receipt back to " << *it2 << " which is online." << std::endl;
@ -999,7 +1007,7 @@ void p3GRouter::routePendingObjects()
locked_sendTransactionData(*it2,*(*it3) ) ; locked_sendTransactionData(*it2,*(*it3) ) ;
// then remove from the set. // then remove from the set.
std::list<RsPeerId>::iterator it2tmp = it2 ; std::set<RsPeerId>::iterator it2tmp = it2 ;
++it2tmp ; ++it2tmp ;
it->second.incoming_routes.ids.erase(it2) ; it->second.incoming_routes.ids.erase(it2) ;
it2 = it2tmp ; it2 = it2tmp ;
@ -1020,7 +1028,7 @@ void p3GRouter::routePendingObjects()
} }
} }
void p3GRouter::locked_collectAvailableFriends(const GRouterKeyId& gxs_id,std::list<RsPeerId>& friend_peers,bool is_origin) void p3GRouter::locked_collectAvailableFriends(const GRouterKeyId& gxs_id,std::list<RsPeerId>& friend_peers,const std::set<RsPeerId>& incoming_routes,bool is_origin)
{ {
// The strategy is the following: // The strategy is the following:
// if origin // if origin
@ -1034,20 +1042,28 @@ void p3GRouter::locked_collectAvailableFriends(const GRouterKeyId& gxs_id,std::l
std::vector<float> probas; std::vector<float> probas;
std::vector<RsPeerId> tmp_peers; std::vector<RsPeerId> tmp_peers;
// remove previous peers
for(std::set<RsPeerId>::const_iterator it(ids.begin());it!=ids.end();++it) for(std::set<RsPeerId>::const_iterator it(ids.begin());it!=ids.end();++it)
tmp_peers.push_back(*it) ; if(incoming_routes.find(*it) == incoming_routes.end())
tmp_peers.push_back(*it) ;
if(tmp_peers.empty())
return ;
_routing_matrix.computeRoutingProbabilities(gxs_id, tmp_peers, probas) ; _routing_matrix.computeRoutingProbabilities(gxs_id, tmp_peers, probas) ;
#ifdef GROUTER_DEBUG #ifdef GROUTER_DEBUG
std::cerr << "locked_getAvailableFriends()" << std::endl; std::cerr << "locked_getAvailableFriends()" << std::endl;
std::cerr << " getting connected friends, computing routing probabilities" << std::endl; std::cerr << " getting connected friends, computing routing probabilities" << std::endl;
for(uint32_t i=0;i<ids.size();++i) for(uint32_t i=0;i<tmp_peers.size();++i)
std::cerr << " " << tmp_peers[i] << ", probability: " << probas[i] << std::endl; std::cerr << " " << tmp_peers[i] << ", probability: " << probas[i] << std::endl;
#endif #endif
uint32_t max_count = is_origin?3:1 ; uint32_t max_count = is_origin?3:1 ;
float probability_threshold = is_origin?0.0:0.5 ; float probability_threshold = is_origin?0.0:0.5 ;
std::cerr << " position at origin: " << is_origin << " => mac_count=" << max_count << ", proba threashold=" << probability_threshold << std::endl;
std::vector<std::pair<float,RsPeerId> > mypairs ; std::vector<std::pair<float,RsPeerId> > mypairs ;
for(uint32_t i=0;i<tmp_peers.size();++i) for(uint32_t i=0;i<tmp_peers.size();++i)
@ -1062,7 +1078,13 @@ void p3GRouter::locked_collectAvailableFriends(const GRouterKeyId& gxs_id,std::l
for(std::vector<std::pair<float,RsPeerId> >::const_reverse_iterator it = mypairs.rbegin();it!=mypairs.rend() && n<max_count;++it) for(std::vector<std::pair<float,RsPeerId> >::const_reverse_iterator it = mypairs.rbegin();it!=mypairs.rend() && n<max_count;++it)
if( (*it).first >= probability_threshold ) if( (*it).first >= probability_threshold )
{
friend_peers.push_back( (*it).second ), ++n ; friend_peers.push_back( (*it).second ), ++n ;
std::cerr << " keeping " << (*it).second << std::endl;
if(!is_origin) // only collect one peer if we're not at origin.
break ;
}
} }
void p3GRouter::locked_collectAvailableTunnels(const TurtleFileHash& hash,std::list<RsPeerId>& tunnel_peers) void p3GRouter::locked_collectAvailableTunnels(const TurtleFileHash& hash,std::list<RsPeerId>& tunnel_peers)
@ -1450,7 +1472,7 @@ void p3GRouter::handleIncomingDataItem(RsGRouterGenericDataItem *data_item)
GRouterRoutingInfo& info(_pending_messages[data_item->routing_id]) ; GRouterRoutingInfo& info(_pending_messages[data_item->routing_id]) ;
info.data_item = data_item ; info.data_item = data_item->duplicate() ;
info.receipt_item = NULL ; info.receipt_item = NULL ;
info.data_status = RS_GROUTER_DATA_STATUS_PENDING ; info.data_status = RS_GROUTER_DATA_STATUS_PENDING ;
info.tunnel_status = RS_GROUTER_TUNNEL_STATUS_UNMANAGED ; info.tunnel_status = RS_GROUTER_TUNNEL_STATUS_UNMANAGED ;
@ -1458,16 +1480,15 @@ void p3GRouter::handleIncomingDataItem(RsGRouterGenericDataItem *data_item)
info.last_friend_sent_TS = 0 ; info.last_friend_sent_TS = 0 ;
info.last_tunnel_request_TS = 0 ; info.last_tunnel_request_TS = 0 ;
info.sending_attempts = 0 ; info.sending_attempts = 0 ;
info.routing_flags = GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_FRIENDS | GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_TUNNELS ; info.routing_flags = GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_FRIENDS ; // don't allow tunnels just yet
info.received_time_TS = time(NULL) ; info.received_time_TS = time(NULL) ;
info.tunnel_hash = makeTunnelHash(data_item->destination_key,data_item->service_id) ; info.tunnel_hash = makeTunnelHash(data_item->destination_key,data_item->service_id) ;
} }
std::cerr << " storing incoming route: from " << data_item->PeerId() << std::endl; std::cerr << " storing incoming route: from " << data_item->PeerId() << std::endl;
#warning we should make sure there is no duplicates. Possibly turn RsTlvIdSet.ids into a std::set!
if(!mTurtle->isTurtlePeer(data_item->PeerId())) if(!mTurtle->isTurtlePeer(data_item->PeerId()))
_pending_messages[data_item->routing_id].incoming_routes.ids.push_back(data_item->PeerId()) ; _pending_messages[data_item->routing_id].incoming_routes.ids.insert(data_item->PeerId()) ;
return ; return ;
} }
@ -1842,15 +1863,13 @@ bool p3GRouter::sendData(const RsGxsId& destination,const GRouterServiceId& clie
info.last_friend_sent_TS = 0 ; info.last_friend_sent_TS = 0 ;
info.last_tunnel_request_TS = 0 ; info.last_tunnel_request_TS = 0 ;
info.sending_attempts = 0 ; info.sending_attempts = 0 ;
info.routing_flags = GRouterRoutingInfo::ROUTING_FLAGS_IS_ORIGIN info.routing_flags = GRouterRoutingInfo::ROUTING_FLAGS_IS_ORIGIN | GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_FRIENDS ;// don't allow tunnels just yet
| GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_TUNNELS
| GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_FRIENDS ;
info.received_time_TS = now ; info.received_time_TS = now ;
info.tunnel_hash = makeTunnelHash(destination,client_id) ; info.tunnel_hash = makeTunnelHash(destination,client_id) ;
#ifdef GROUTER_DEBUG #ifdef GROUTER_DEBUG
grouter_debug() << "p3GRouter::sendGRouterData(): pushing the following item in the msg pending list:" << std::endl; grouter_debug() << "p3GRouter::sendGRouterData(): pushing the following item in the msg pending list:" << std::endl;
grouter_debug() << " routing id = " << propagation_id << std::endl; grouter_debug() << " routing id = " << std::hex << propagation_id << std::dec << std::endl;
grouter_debug() << " data_item.size = " << info.data_item->data_size << std::endl; grouter_debug() << " data_item.size = " << info.data_item->data_size << std::endl;
grouter_debug() << " data_item.byte = " << RsDirUtil::sha1sum(info.data_item->data_bytes,info.data_item->data_size) << std::endl; grouter_debug() << " data_item.byte = " << RsDirUtil::sha1sum(info.data_item->data_bytes,info.data_item->data_size) << std::endl;
grouter_debug() << " destination = " << data_item->destination_key << std::endl; grouter_debug() << " destination = " << data_item->destination_key << std::endl;
@ -1862,6 +1881,7 @@ bool p3GRouter::sendData(const RsGxsId& destination,const GRouterServiceId& clie
grouter_debug() << " recv time = " << info.received_time_TS << std::endl; grouter_debug() << " recv time = " << info.received_time_TS << std::endl;
grouter_debug() << " client id = " << std::hex << data_item->service_id << std::dec << std::endl; grouter_debug() << " client id = " << std::hex << data_item->service_id << std::dec << std::endl;
grouter_debug() << " tunnel hash = " << info.tunnel_hash << std::endl; grouter_debug() << " tunnel hash = " << info.tunnel_hash << std::endl;
grouter_debug() << " routing flags = " << info.routing_flags << std::endl;
#endif #endif
{ {

View File

@ -264,7 +264,7 @@ private:
bool locked_sendTransactionData(const RsPeerId& pid,const RsGRouterTransactionItem& item); bool locked_sendTransactionData(const RsPeerId& pid,const RsGRouterTransactionItem& item);
void locked_collectAvailableFriends(const GRouterKeyId &gxs_id,std::list<RsPeerId>& friend_peers, bool is_origin); void locked_collectAvailableFriends(const GRouterKeyId &gxs_id,std::list<RsPeerId>& friend_peers, const std::set<RsPeerId>& incoming_routes,bool is_origin);
void locked_collectAvailableTunnels(const TurtleFileHash& hash,std::list<RsPeerId>& tunnel_peers); void locked_collectAvailableTunnels(const TurtleFileHash& hash,std::list<RsPeerId>& tunnel_peers);
//===================================================// //===================================================//