implemented a better management of tunnel requests

git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@4093 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
csoler 2011-03-15 23:09:31 +00:00
parent c42b326315
commit 9e2be4b5be
2 changed files with 159 additions and 51 deletions

View File

@ -58,16 +58,33 @@
#include "util/rsrandom.h"
#include "pqi/pqinetwork.h"
#ifdef TUNNEL_STATISTICS
static std::vector<int> TS_tunnel_length(8,0) ;
static std::map<TurtleFileHash, std::vector<std::pair<time_t,TurtleTunnelRequestId> > > TS_request_time_stamps ;
static std::map<TurtleTunnelRequestId, std::vector<time_t> > TS_request_bounces ;
void TS_dumpState() ;
#endif
// These number may be quite important. I setup them with sensible values, but
// an in-depth test would be better to get an idea of what the ideal values
// could ever be.
//
static const time_t TUNNEL_REQUESTS_LIFE_TIME = 30 ; /// life time for tunnel requests in the cache.
static const time_t SEARCH_REQUESTS_LIFE_TIME = 30 ; /// life time for search requests in the cache
// update of 14-03-11:
// - I raised the cache time for tunnel requests. This avoids inconsistencies such as:
// * tunnel requests bouncing back while the original request is not in the cache anymore
// * special case of this for own file transfer: an outgoing tunnel is built with no end.
// - changed tunnel speed estimate time lapse to 5 secs. Too small a value favors high variations.
// - the max number of tunnel requests per second is now enforced. It was before defaulting to
// QUEUE_LENGTH*0.1, meaning 0.5. I set it to 0.1.
//
static const time_t TUNNEL_REQUESTS_LIFE_TIME = 60 ; /// life time for tunnel requests in the cache.
static const time_t SEARCH_REQUESTS_LIFE_TIME = 60 ; /// life time for search requests in the cache
static const time_t REGULAR_TUNNEL_DIGGING_TIME = 300 ; /// maximum interval between two tunnel digging campaigns.
static const time_t MAXIMUM_TUNNEL_IDLE_TIME = 60 ; /// maximum life time of an unused tunnel.
static const time_t TUNNEL_MANAGEMENT_LAPS_TIME = 10 ; /// look into tunnels regularly every 10 sec.
static const time_t TUNNEL_SPEED_ESTIMATE_LAPSE = 2 ; /// estimate tunnel speed every 2 seconds
static const time_t EMPTY_TUNNELS_DIGGING_TIME = 50 ; /// look into tunnels regularly every 50 sec.
static const time_t TUNNEL_SPEED_ESTIMATE_LAPSE = 5 ; /// estimate tunnel speed every 5 seconds
static const time_t TUNNEL_CLEANING_LAPS_TIME = 10 ; /// clean tunnels every 10 secs
static const uint32_t MAX_TUNNEL_REQS_PER_SECOND= 1 ; /// maximum number of tunnel requests per second. Was 0.5 before
p3turtle::p3turtle(p3ConnectMgr *cm,ftServer *fs)
:p3Service(RS_SERVICE_TYPE_TURTLE), p3Config(CONFIG_TYPE_TURTLE), mConnMgr(cm)
@ -94,12 +111,18 @@ int p3turtle::tick()
time_t now = time(NULL) ;
bool should_manage_tunnels,should_autowash,should_estimatespeed ;
#ifdef TUNNEL_STATISTICS
static time_t last_now = now ;
if(now - last_now > 2)
std::cerr << "******************* WARNING: now - last_now = " << now - last_now << std::endl;
last_now = now ;
#endif
bool should_autowash,should_estimatespeed ;
{
RsStackMutex stack(mTurtleMtx); /********** STACK LOCKED MTX ******/
should_manage_tunnels = now > TUNNEL_MANAGEMENT_LAPS_TIME+_last_tunnel_management_time || _force_digg_new_tunnels ;
should_autowash = now > 10+_last_clean_time ;
should_autowash = now > TUNNEL_CLEANING_LAPS_TIME+_last_clean_time ;
should_estimatespeed = now >= TUNNEL_SPEED_ESTIMATE_LAPSE + _last_tunnel_speed_estimate_time ;
}
@ -108,7 +131,7 @@ int p3turtle::tick()
// - we digg new tunnels each time a new peer connects
// - we digg new tunnels each time a new hash is asked for
//
if(should_manage_tunnels)
if(now >= _last_tunnel_management_time+1) // call every second
{
#ifdef P3TURTLE_DEBUG
std::cerr << "Calling tunnel management." << std::endl ;
@ -140,6 +163,18 @@ int p3turtle::tick()
_last_tunnel_speed_estimate_time = now ;
}
#ifdef TUNNEL_STATISTICS
// Dump state for debugging, every 20 sec.
//
static time_t TS_last_dump = time(NULL) ;
if(now > 20+TS_last_dump)
{
TS_last_dump = now ;
TS_dumpState() ;
}
#endif
#ifdef P3TURTLE_DEBUG
// Dump state for debugging, every 20 sec.
//
@ -159,6 +194,7 @@ int p3turtle::tick()
// -----------------------------------------------------------------------------------//
//
#ifdef TO_REMOVE
// This method handles peer connexion/deconnexion
// If A connects, new tunnels should be initiated from A
// If A disconnects, the tunnels passed through A should be closed.
@ -181,6 +217,7 @@ void p3turtle::statusChange(const std::list<pqipeer> &plist) // derived from pqi
if ((pit->state & RS_PEER_S_FRIEND) && (pit->actions & RS_PEER_CONNECTED))
_force_digg_new_tunnels = true ;
}
#endif
// adds a virtual peer to the list that is communicated ot ftController.
//
@ -218,40 +255,55 @@ void p3turtle::getVirtualPeersList(std::list<pqipeer>& list)
// - new peers have connected. The resulting tunnels should be checked against doubling.
// - new hashes are submitted for handling.
//
class hashPairComparator
{
public:
virtual bool operator()(const std::pair<TurtleFileHash,time_t>& p1,const std::pair<TurtleFileHash,time_t>& p2) const
{
return p1.second < p2.second ;
}
};
void p3turtle::manageTunnels()
{
// Collect hashes for which tunnel digging is necessary / recommended
// Collect hashes for which tunnel digging is necessary / recommended. Hashes get in the list for two reasons:
// - the hash has no tunnel -> tunnel digging every EMPTY_TUNNELS_DIGGING_TIME seconds
// - the hash hasn't been tunneled for more than REGULAR_TUNNEL_DIGGING_TIME seconds, even if downloading.
//
// Candidate hashes are sorted, by olderness. The older gets tunneled first. At most MAX_TUNNEL_REQS_PER_SECOND are
// treated at once, as this method is called every second.
// Note: Because REGULAR_TUNNEL_DIGGING_TIME is larger than EMPTY_TUNNELS_DIGGING_TIME, files being downloaded get
// re-tunneled in priority. As this happens less, they don't obliterate tunneling for files that have no tunnels yet.
std::vector<std::pair<TurtleFileHash,time_t> > hashes_to_digg ;
time_t now = time(NULL) ;
std::vector<TurtleFileHash> hashes_to_digg ;
{
RsStackMutex stack(mTurtleMtx); /********** STACK LOCKED MTX ******/
time_t now = time(NULL) ;
bool tunnel_campain = false ;
if(now > _last_tunnel_campaign_time + REGULAR_TUNNEL_DIGGING_TIME)
{
#ifdef P3TURTLE_DEBUG
std::cerr << " Tunnel management: flaging all hashes for tunnels digging." << std::endl ;
#endif
tunnel_campain = true ;
_last_tunnel_campaign_time = now ;
}
// digg new tunnels if no tunnels are available and force digg new tunnels at regular (large) interval
//
for(std::map<TurtleFileHash,TurtleFileHashInfo>::const_iterator it(_incoming_file_hashes.begin());it!=_incoming_file_hashes.end();++it)
if(it->second.tunnels.empty() // digg new tunnels if no tunnels are available
|| _force_digg_new_tunnels // digg new tunnels when forced to (e.g. a new peer has connected)
|| tunnel_campain) // force digg new tunnels at regular (large) interval
if( (it->second.tunnels.empty() && now >= it->second.last_digg_time+EMPTY_TUNNELS_DIGGING_TIME) || now >= it->second.last_digg_time + REGULAR_TUNNEL_DIGGING_TIME)
{
#ifdef P3TURTLE_DEBUG
std::cerr << " Tunnel management: digging new tunnels for hash " << it->first << "." << std::endl ;
std::cerr << "pushed hash " << it->first << ", for digging. Old = " << now - it->second.last_digg_time << std::endl;
#endif
hashes_to_digg.push_back(it->first) ;
hashes_to_digg.push_back(std::pair<TurtleFileHash,time_t>(it->first,it->second.last_digg_time)) ;
}
_force_digg_new_tunnels = false ;
}
#ifdef TUNNEL_STATISTICS
std::cerr << hashes_to_digg.size() << " hashes candidate for tunnel digging." << std::endl;
#endif
for(unsigned int i=0;i<hashes_to_digg.size();++i)
diggTunnel(hashes_to_digg[i]) ;
std::sort(hashes_to_digg.begin(),hashes_to_digg.end(),hashPairComparator()) ;
for(unsigned int i=0;i<MAX_TUNNEL_REQS_PER_SECOND && i<hashes_to_digg.size();++i)// Digg at most n tunnels per second.
{
#ifdef TUNNEL_STATISTICS
std::cerr << "!!!!!!!!!!!! Digging for " << hashes_to_digg[i].first << std::endl;
#endif
diggTunnel(hashes_to_digg[i].first) ;
}
}
void p3turtle::estimateTunnelSpeeds()
@ -395,7 +447,7 @@ void p3turtle::locked_closeTunnel(TurtleTunnelId tid,std::vector<std::pair<Turtl
{
// This is closing a given tunnel, removing it from file sources, and from the list of tunnels of its
// corresponding file hash. In the original turtle4privacy paradigm, they also send back and forward
// tunnel closing commands. I'm not sure this is necessary, because if a tunnel is closed somewhere, it's
// tunnel closing commands. In our case, this is not necessary, because if a tunnel is closed somewhere, its
// source is not going to be used and the tunnel will eventually disappear.
//
std::map<TurtleTunnelId,TurtleTunnel>::iterator it(_local_tunnels.find(tid)) ;
@ -552,14 +604,6 @@ bool p3turtle::loadList(std::list<RsItem*> load)
uint32_t p3turtle::generateRandomRequestId()
{
return RSRandom::random_u32() ;
// RsStackMutex stack(mTurtleMtx); /********** STACK LOCKED MTX ******/
//
// static uint64_t s = time(NULL) ;
//
// s = s*1664525 + 1013904223 ; // pseudo random number generator from Wikipedia/Numerical Recipies.
//
// return (uint32_t)(s & 0x00000000ffffffff) ;
}
uint32_t p3turtle::generatePersonalFilePrint(const TurtleFileHash& hash,bool b)
@ -985,7 +1029,7 @@ void p3turtle::handleRecvFileData(RsTurtleFileDataItem *item)
size = hash_info.size ;
// also update the hash time stamp to show that it's actually being downloaded.
it->second.time_stamp = time(NULL) ;
//it->second.time_stamp = time(NULL) ;
}
_ft_server->getMultiplexer()->recvData(vpid,hash,size,item->chunk_offset,item->chunk_size,item->chunk_data) ;
@ -1411,11 +1455,11 @@ bool p3turtle::isOnline(const std::string& peer_id) const
TurtleRequestId p3turtle::diggTunnel(const TurtleFileHash& hash)
{
#ifdef P3TURTLE_DEBUG
std::cerr << "performing tunnel request. OwnId = " << mConnMgr->getOwnId() << std::endl ;
std::cerr << "DiggTunnel: performing tunnel request. OwnId = " << mConnMgr->getOwnId() << " for hash=" << hash << std::endl ;
#endif
while(mConnMgr->getOwnId() == "")
{
std::cerr << "... waitting for connect manager to form own id." << std::endl ;
std::cerr << "... waiting for connect manager to form own id." << std::endl ;
#ifdef WIN32
Sleep(1000) ;
#else
@ -1430,6 +1474,7 @@ TurtleRequestId p3turtle::diggTunnel(const TurtleFileHash& hash)
// Store the request id, so that we can find the hash back when we get the response.
//
_incoming_file_hashes[hash].last_request = id ;
_incoming_file_hashes[hash].last_digg_time = time(NULL) ;
}
// Form a tunnel request packet that simulates a request from us.
@ -1444,8 +1489,10 @@ TurtleRequestId p3turtle::diggTunnel(const TurtleFileHash& hash)
// send it
#ifdef TUNNEL_STATISTICS
TS_request_bounces[item->request_id].clear() ; // forces initialization
#endif
handleTunnelRequest(item) ;
delete item ;
return id ;
@ -1457,6 +1504,12 @@ void p3turtle::handleTunnelRequest(RsTurtleOpenTunnelItem *item)
std::cerr << "Received tunnel request from peer " << item->PeerId() << ": " << std::endl ;
item->print(std::cerr,0) ;
#endif
#ifdef TUNNEL_STATISTICS
if(TS_request_bounces.find(item->request_id) != TS_request_bounces.end())
TS_request_bounces[item->request_id].push_back(time(NULL)) ;
#endif
// If the item contains an already handled tunnel request, give up. This
// happens when the same tunnel request gets relayed by different peers. We
// have to be very careful here, not to call ftController while mTurtleMtx is
@ -1479,6 +1532,13 @@ void p3turtle::handleTunnelRequest(RsTurtleOpenTunnelItem *item)
TurtleRequestInfo& req( _tunnel_requests_origins[item->request_id] ) ;
req.origin = item->PeerId() ;
req.time_stamp = time(NULL) ;
#ifdef TUNNEL_STATISTICS
std::cerr << "storing tunnel request " << (void*)(item->request_id) << std::endl ;
++TS_tunnel_length[item->depth] ;
TS_request_time_stamps[item->file_hash].push_back(std::pair<time_t,TurtleTunnelRequestId>(time(NULL),item->request_id)) ;
#endif
}
// If it's not for us, perform a local search. If something found, forward the search result back.
@ -1629,8 +1689,9 @@ void p3turtle::handleTunnelResult(RsTurtleTunnelOkItem *item)
#endif
// Tunnel is ending here. Add it to the list of tunnels for the given hash.
// 1 - find which file hash issued this request. This is not costly, because there is not too much file hashes to be active
// at a time, and this mostly prevents from sending the hash back in the tunnel.
// 1 - find which file hash issued this request. This is not costly,
// because there is not too much file hashes to be active at a time,
// and this mostly prevents from sending the hash back in the tunnel.
bool found = false ;
for(std::map<TurtleFileHash,TurtleFileHashInfo>::iterator it(_incoming_file_hashes.begin());it!=_incoming_file_hashes.end();++it)
@ -1858,12 +1919,11 @@ void p3turtle::monitorFileTunnels(const std::string& name,const std::string& fil
// No tunnels at start, but this triggers digging new tunnels.
//
_incoming_file_hashes[file_hash].tunnels.clear();
_force_digg_new_tunnels = true ;
// also should send associated request to the file transfer module.
_incoming_file_hashes[file_hash].size = size ;
_incoming_file_hashes[file_hash].name = name ;
_incoming_file_hashes[file_hash].time_stamp = time(NULL) ;
_incoming_file_hashes[file_hash].last_digg_time = RSRandom::random_u32()%10 ;
}
IndicateConfigChanged() ; // initiates saving of handled hashes.
@ -1889,7 +1949,7 @@ bool p3turtle::performLocalHashSearch(const TurtleFileHash& hash,FileInfo& info)
}
static std::string printFloatNumber(float num,bool friendly=false)
{;
{
std::ostringstream out ;
if(friendly)
@ -1950,7 +2010,7 @@ void p3turtle::getInfo( std::vector<std::vector<std::string> >& hashes_info,
hashes.push_back(it->first) ;
hashes.push_back(it->second.name) ;
hashes.push_back(printNumber(it->second.tunnels.size())) ;
hashes.push_back(printNumber(now - it->second.time_stamp)+" secs ago") ;
//hashes.push_back(printNumber(now - it->second.time_stamp)+" secs ago") ;
}
tunnels_info.clear();
@ -2056,3 +2116,44 @@ void p3turtle::dumpState()
}
#endif
#ifdef TUNNEL_STATISTICS
void p3turtle::TS_dumpState()
{
RsStackMutex stack(mTurtleMtx); /********** STACK LOCKED MTX ******/
time_t now = time(NULL) ;
std::cerr << "Dumping tunnel statistics:" << std::endl;
std::cerr << "TR Bounces: " << TS_request_bounces.size() << std::endl;
for(std::map<TurtleTunnelRequestId,std::vector<time_t> >::const_iterator it(TS_request_bounces.begin());it!=TS_request_bounces.end();++it)
{
std::cerr << (void*)it->first << ": " ;
for(uint32_t i=0;i<it->second.size();++i)
std::cerr << it->second[i] - it->second[0] << " " ;
std::cerr << std::endl;
}
std::cerr << "TR in cache: " << _tunnel_requests_origins.size() << std::endl;
std::cerr << "TR by size: " ;
for(int i=0;i<8;++i)
std::cerr << "N(" << i << ")=" << TS_tunnel_length[i] << ", " ;
std::cerr << std::endl;
std::cerr << "Total different requested files: " << TS_request_time_stamps.size() << std::endl;
for(std::map<TurtleFileHash, std::vector<std::pair<time_t,TurtleTunnelRequestId> > >::const_iterator it(TS_request_time_stamps.begin());it!=TS_request_time_stamps.end();++it)
{
std::cerr << "hash = " << it->first << ": seconds ago: " ;
float average = 0 ;
for(uint32_t i=std::max(0,(int)it->second.size()-25);i<it->second.size();++i)
{
std::cerr << now - it->second[i].first << " (" << (void*)it->second[i].second << ") " ;
if(i>0)
average += it->second[i].first - it->second[i-1].first ;
}
if(it->second.size()>1)
std::cerr << ", average delay=" << average/(float)(it->second.size()-1) << std::endl;
else
std::cerr << std::endl;
}
}
#endif

View File

@ -148,6 +148,8 @@
#include "retroshare/rsturtle.h"
#include "rsturtleitem.h"
//#define TUNNEL_STATISTICS
class ftServer ;
class p3ConnectMgr;
class ftDataMultiplex;
@ -190,7 +192,7 @@ class TurtleFileHashInfo
TurtleRequestId last_request ; // last request for the tunnels of this hash
TurtleFileName name ;
time_t time_stamp ;
time_t last_digg_time ;
uint64_t size ;
};
@ -205,7 +207,7 @@ class TurtleFileHashInfo
// p3Config | ConfigChanged() | used to load/save .cfg file for turtle variales.
// -----------+------------------+------------------------------------------------------
//
class p3turtle: public p3Service, public pqiMonitor, public RsTurtle,/* public ftSearch */ public p3Config
class p3turtle: public p3Service, /*public pqiMonitor,*/ public RsTurtle,/* public ftSearch */ public p3Config
{
public:
p3turtle(p3ConnectMgr *cm,ftServer *m);
@ -242,11 +244,13 @@ class p3turtle: public p3Service, public pqiMonitor, public RsTurtle,/* public f
std::vector<std::vector<std::string> >&,
std::vector<std::vector<std::string> >&) const ;
#ifdef TO_REMOVE
/************* from pqiMonitor *******************/
/// Informs the turtle router that some peers are (dis)connected. This should initiate digging new tunnels,
/// and closing other tunnels.
///
virtual void statusChange(const std::list<pqipeer> &plist);
#endif
/************* from pqiMonitor *******************/
@ -402,6 +406,9 @@ class p3turtle: public p3Service, public pqiMonitor, public RsTurtle,/* public f
// debug function
void dumpState() ;
#endif
#ifdef TUNNEL_STATISTICS
void TS_dumpState();
#endif
};
#endif