- added a generic RsItem to the turtle router and the methods to route it. This makes the code much more elegant.

- suppressed a cross mutex lock bug that happened rarely while digging tunnels
- changed FileDetails in ftServer so that it's now possiblt to search for hashes of files being downloaded
- improved the search code in ftdatamultiplex
- added some comments to the turtle code



git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@1964 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
csoler 2010-01-02 21:30:19 +00:00
parent 03cbedb224
commit b83e894640
7 changed files with 388 additions and 292 deletions

View file

@ -515,11 +515,6 @@ uint32_t p3turtle::generatePersonalFilePrint(const TurtleFileHash& hash,bool b)
//
int p3turtle::handleIncoming()
{
#ifdef P3TURTLE_DEBUG
// std::cerr << "p3turtle::handleIncoming()";
// std::cerr << std::endl;
#endif
int nhandled = 0;
// While messages read
//
@ -529,32 +524,34 @@ int p3turtle::handleIncoming()
{
nhandled++;
switch(item->PacketSubType())
RsTurtleGenericTunnelItem *gti = dynamic_cast<RsTurtleGenericTunnelItem *>(item) ;
if(gti != NULL)
routeGenericTunnelItem(gti) ; /// Generic packets, that travel through established tunnels.
else /// These packets should be destroyed by the client.
{
case RS_TURTLE_SUBTYPE_STRING_SEARCH_REQUEST:
case RS_TURTLE_SUBTYPE_REGEXP_SEARCH_REQUEST: handleSearchRequest(dynamic_cast<RsTurtleSearchRequestItem *>(item)) ;
break ;
/// Special packets that require specific treatment, because tunnels do not exist for these packets.
/// These packets are destroyed here, after treatment.
//
switch(item->PacketSubType())
{
case RS_TURTLE_SUBTYPE_STRING_SEARCH_REQUEST:
case RS_TURTLE_SUBTYPE_REGEXP_SEARCH_REQUEST: handleSearchRequest(dynamic_cast<RsTurtleSearchRequestItem *>(item)) ;
break ;
case RS_TURTLE_SUBTYPE_SEARCH_RESULT : handleSearchResult(dynamic_cast<RsTurtleSearchResultItem *>(item)) ;
break ;
case RS_TURTLE_SUBTYPE_SEARCH_RESULT : handleSearchResult(dynamic_cast<RsTurtleSearchResultItem *>(item)) ;
break ;
case RS_TURTLE_SUBTYPE_OPEN_TUNNEL : handleTunnelRequest(dynamic_cast<RsTurtleOpenTunnelItem *>(item)) ;
break ;
case RS_TURTLE_SUBTYPE_OPEN_TUNNEL : handleTunnelRequest(dynamic_cast<RsTurtleOpenTunnelItem *>(item)) ;
break ;
case RS_TURTLE_SUBTYPE_TUNNEL_OK : handleTunnelResult(dynamic_cast<RsTurtleTunnelOkItem *>(item)) ;
break ;
case RS_TURTLE_SUBTYPE_FILE_REQUEST : handleRecvFileRequest(dynamic_cast<RsTurtleFileRequestItem *>(item)) ;
break ;
case RS_TURTLE_SUBTYPE_FILE_DATA : handleRecvFileData(dynamic_cast<RsTurtleFileDataItem *>(item)) ;
break ;
// Here will also come handling of file transfer requests, tunnel digging/closing, etc.
default:
std::cerr << "p3turtle::handleIncoming: Unknown packet subtype " << item->PacketSubType() << std::endl ;
case RS_TURTLE_SUBTYPE_TUNNEL_OK : handleTunnelResult(dynamic_cast<RsTurtleTunnelOkItem *>(item)) ;
break ;
default:
std::cerr << "p3turtle::handleIncoming: Unknown packet subtype " << item->PacketSubType() << std::endl ;
}
delete item;
}
delete item;
}
return nhandled;
@ -725,6 +722,88 @@ void p3turtle::handleSearchResult(RsTurtleSearchResultItem *item)
// --------------------------------- File Transfer. -------------------------------- //
// -----------------------------------------------------------------------------------//
// Routing of turtle tunnel items in a generic manner. Most tunnel packets will use this function, except packets designed for
// contructing the tunnels and searching, namely TurtleSearchRequests/Results and OpenTunnel/TunnelOkItems
//
void p3turtle::routeGenericTunnelItem(RsTurtleGenericTunnelItem *item)
{
#ifdef P3TURTLE_DEBUG
std::cerr << "p3Turtle: treating generic tunnel item:" << std::endl ;
item->print(std::cerr,1) ;
#endif
RsTurtleGenericTunnelItem::Direction direction = item->travelingDirection() ;
{
RsStackMutex stack(mTurtleMtx); /********** STACK LOCKED MTX ******/
// look for the tunnel id.
//
std::map<TurtleTunnelId,TurtleTunnel>::iterator it(_local_tunnels.find(item->tunnelId())) ;
if(it == _local_tunnels.end())
{
#ifdef P3TURTLE_DEBUG
std::cerr << "p3turtle: got file map with unknown tunnel id " << (void*)item->tunnelId() << std::endl ;
#endif
return ;
}
TurtleTunnel& tunnel(it->second) ;
// Only file data transfer updates tunnels time_stamp field, to avoid maintaining tunnel that are incomplete.
if(item->shouldStampTunnel())
tunnel.time_stamp = time(NULL) ;
// Let's figure out whether this packet is for us or not.
if(direction == RsTurtleGenericTunnelItem::DIRECTION_CLIENT && tunnel.local_src != mConnMgr->getOwnId())
{
#ifdef P3TURTLE_DEBUG
std::cerr << " Forwarding generic item to peer " << tunnel.local_src << std::endl ;
#endif
item->PeerId(tunnel.local_src) ;
sendItem(item) ;
return ;
}
if(direction == RsTurtleGenericTunnelItem::DIRECTION_SERVER && tunnel.local_dst != mConnMgr->getOwnId())
{
#ifdef P3TURTLE_DEBUG
std::cerr << " Forwarding generic item to peer " << tunnel.local_dst << std::endl ;
#endif
item->PeerId(tunnel.local_dst) ;
sendItem(item) ;
return ;
}
}
// The packet was not forwarded, so it is for us. Let's treat it.
// This is done off-mutex, to avoid various deadlocks
//
if(direction == RsTurtleGenericTunnelItem::DIRECTION_SERVER)
switch(item->PacketSubType())
{
case RS_TURTLE_SUBTYPE_FILE_REQUEST: handleRecvFileRequest(dynamic_cast<RsTurtleFileRequestItem *>(item)) ;
break ;
default:
std::cerr << "Unknown server packet type received: id=" << (void*)(item->PacketSubType()) << std::endl ;
exit(-1) ;
}
if(direction == RsTurtleGenericTunnelItem::DIRECTION_CLIENT)
switch(item->PacketSubType())
{
case RS_TURTLE_SUBTYPE_FILE_DATA : handleRecvFileData(dynamic_cast<RsTurtleFileDataItem *>(item)) ;
break ;
default:
std::cerr << "Unknown client packet type received: id=" << (void*)(item->PacketSubType()) << std::endl ;
exit(-1) ;
}
delete item ;
}
void p3turtle::handleRecvFileRequest(RsTurtleFileRequestItem *item)
{
@ -742,9 +821,9 @@ void p3turtle::handleRecvFileRequest(RsTurtleFileRequestItem *item)
{
RsStackMutex stack(mTurtleMtx); /********** STACK LOCKED MTX ******/
std::map<TurtleTunnelId,TurtleTunnel>::iterator it(_local_tunnels.find(item->tunnel_id)) ;
std::map<TurtleTunnelId,TurtleTunnel>::iterator it2(_local_tunnels.find(item->tunnel_id)) ;
if(it == _local_tunnels.end())
if(it2 == _local_tunnels.end())
{
#ifdef P3TURTLE_DEBUG
std::cerr << "p3turtle: got file request with unknown tunnel id " << (void*)item->tunnel_id << std::endl ;
@ -752,36 +831,19 @@ void p3turtle::handleRecvFileRequest(RsTurtleFileRequestItem *item)
return ;
}
TurtleTunnel& tunnel(it->second) ;
// Let's figure out whether this reuqest is for us or not.
if(tunnel.local_dst == mConnMgr->getOwnId()) // Yes, we have to pass on the request to the data multiplexer
{
std::map<TurtleFileHash,FileInfo>::const_iterator it(_outgoing_file_hashes.find(tunnel.hash)) ;
TurtleTunnel& tunnel(it2->second) ;
std::map<TurtleFileHash,FileInfo>::const_iterator it(_outgoing_file_hashes.find(tunnel.hash)) ;
#ifdef P3TURTLE_DEBUG
assert(!tunnel.hash.empty()) ;
assert(it != _outgoing_file_hashes.end()) ;
assert(!tunnel.hash.empty()) ;
assert(it != _outgoing_file_hashes.end()) ;
std::cerr << " This is an endpoint for this file request." << std::endl ;
std::cerr << " Forwarding data request to the multiplexer." << std::endl ;
std::cerr << " using peer_id=" << tunnel.vpid << ", hash=" << tunnel.hash << std::endl ;
std::cerr << " This is an endpoint for this file request." << std::endl ;
std::cerr << " Forwarding data request to the multiplexer." << std::endl ;
std::cerr << " using peer_id=" << tunnel.vpid << ", hash=" << tunnel.hash << std::endl ;
#endif
// _ft_server->getMultiplexer()->recvDataRequest(tunnel.vpid,tunnel.hash,it->second.size,item->chunk_offset,item->chunk_size) ;
//
size = it->second.size ;
vpid = tunnel.vpid ;
hash = tunnel.hash ;
}
else // No, it's a request we should forward down the pipe.
{
RsTurtleFileRequestItem *res_item = new RsTurtleFileRequestItem(*item) ;
res_item->PeerId(tunnel.local_dst) ;
sendItem(res_item) ;
return ;
}
size = it->second.size ;
vpid = tunnel.vpid ;
hash = tunnel.hash ;
}
// This call is voluntarily off-mutex gards because it can cause cross mutex locks with the multiplexer.
@ -796,18 +858,16 @@ void p3turtle::handleRecvFileData(RsTurtleFileDataItem *item)
std::cerr << "p3Turtle: received file data item:" << std::endl ;
item->print(std::cerr,1) ;
#endif
// This is a new request. Let's add it to the request map, and forward it to
// open peers.
TurtleVirtualPeerId vpid ;
uint64_t size ;
TurtleFileHash hash ;
{
RsStackMutex stack(mTurtleMtx); /********** STACK LOCKED MTX ******/
std::map<TurtleTunnelId,TurtleTunnel>::iterator it(_local_tunnels.find(item->tunnel_id)) ;
std::map<TurtleTunnelId,TurtleTunnel>::iterator it2(_local_tunnels.find(item->tunnel_id)) ;
if(it == _local_tunnels.end())
if(it2 == _local_tunnels.end())
{
#ifdef P3TURTLE_DEBUG
std::cerr << "p3turtle: got file data with unknown tunnel id " << (void*)item->tunnel_id << std::endl ;
@ -815,68 +875,40 @@ void p3turtle::handleRecvFileData(RsTurtleFileDataItem *item)
return ;
}
TurtleTunnel& tunnel(it->second) ;
// Only file data transfer updates tunnels time_stamp field, to avoid maintaining tunnel that are incomplete.
tunnel.time_stamp = time(NULL) ;
// Let's figure out whether this reuqest is for us or not.
if(tunnel.local_src == mConnMgr->getOwnId()) // Yes, we have to pass on the data to the multiplexer
{
std::map<TurtleFileHash,TurtleFileHashInfo>::iterator it( _incoming_file_hashes.find(tunnel.hash) ) ;
TurtleTunnel& tunnel(it2->second) ;
std::map<TurtleFileHash,TurtleFileHashInfo>::iterator it( _incoming_file_hashes.find(tunnel.hash) ) ;
#ifdef P3TURTLE_DEBUG
assert(!tunnel.hash.empty()) ;
assert(!tunnel.hash.empty()) ;
#endif
if(it==_incoming_file_hashes.end())
{
#ifdef P3TURTLE_DEBUG
std::cerr << "No tunnel for incoming data. Maybe the tunnel is being closed." << std::endl ;
#endif
return ;
}
const TurtleFileHashInfo& hash_info(it->second) ;
#ifdef P3TURTLE_DEBUG
std::cerr << " This is an endpoint for this data chunk." << std::endl ;
std::cerr << " Forwarding data to the multiplexer." << std::endl ;
std::cerr << " using peer_id=" << tunnel.vpid << ", hash=" << tunnel.hash << std::endl ;
#endif
//_ft_server->getMultiplexer()->recvData(tunnel.vpid,tunnel.hash,hash_info.size,item->chunk_offset,item->chunk_size,item->chunk_data) ;
vpid = tunnel.vpid ;
hash = tunnel.hash ;
size = hash_info.size ;
// also update the hash time stamp to show that it's actually being downloaded.
it->second.time_stamp = time(NULL) ;
}
else // No, it's a request we should forward down the pipe.
if(it==_incoming_file_hashes.end())
{
#ifdef P3TURTLE_DEBUG
std::cerr << " Forwarding data chunk to peer " << tunnel.local_src << std::endl ;
std::cerr << "No tunnel for incoming data. Maybe the tunnel is being closed." << std::endl ;
#endif
RsTurtleFileDataItem *res_item = new RsTurtleFileDataItem(*item) ;
res_item->chunk_data = malloc(res_item->chunk_size) ;
if(res_item->chunk_data == NULL)
{
std::cerr << "p3turtle: Warning: failed malloc of " << res_item->chunk_size << " bytes for received data packet." << std::endl ;
return ;
}
memcpy(res_item->chunk_data,item->chunk_data,res_item->chunk_size) ;
res_item->PeerId(tunnel.local_src) ;
sendItem(res_item) ;
return ;
}
const TurtleFileHashInfo& hash_info(it->second) ;
#ifdef P3TURTLE_DEBUG
std::cerr << " This is an endpoint for this data chunk." << std::endl ;
std::cerr << " Forwarding data to the multiplexer." << std::endl ;
std::cerr << " using peer_id=" << tunnel.vpid << ", hash=" << tunnel.hash << std::endl ;
#endif
//_ft_server->getMultiplexer()->recvData(tunnel.vpid,tunnel.hash,hash_info.size,item->chunk_offset,item->chunk_size,item->chunk_data) ;
vpid = tunnel.vpid ;
hash = tunnel.hash ;
size = hash_info.size ;
// also update the hash time stamp to show that it's actually being downloaded.
it->second.time_stamp = time(NULL) ;
}
_ft_server->getMultiplexer()->recvData(vpid,hash,size,item->chunk_offset,item->chunk_size,item->chunk_data) ;
item->chunk_data = NULL ; // this prevents deletion in the destructor of RsFileDataItem, because data will be deleted
// down _ft_server->getMultiplexer()->recvData()...in ftTransferModule::recvFileData
}
void p3turtle::handleRecvFileMap(RsTurtleFileMapItem *item)
{
#ifdef P3TURTLE_DEBUG
@ -886,61 +918,42 @@ void p3turtle::handleRecvFileMap(RsTurtleFileMapItem *item)
{
RsStackMutex stack(mTurtleMtx); /********** STACK LOCKED MTX ******/
std::map<TurtleTunnelId,TurtleTunnel>::iterator it(_local_tunnels.find(item->tunnel_id)) ;
std::map<TurtleTunnelId,TurtleTunnel>::iterator it2(_local_tunnels.find(item->tunnel_id)) ;
if(it == _local_tunnels.end())
if(it2 == _local_tunnels.end())
{
#ifdef P3TURTLE_DEBUG
std::cerr << "p3turtle: got file map with unknown tunnel id " << (void*)item->tunnel_id << std::endl ;
std::cerr << "p3turtle: got file data with unknown tunnel id " << (void*)item->tunnel_id << std::endl ;
#endif
return ;
}
TurtleTunnel& tunnel(it->second) ;
TurtleTunnel& tunnel(it2->second) ;
// Only file data transfer updates tunnels time_stamp field, to avoid maintaining tunnel that are incomplete.
tunnel.time_stamp = time(NULL) ;
// Let's figure out whether this reuqest is for us or not.
if(tunnel.local_src == mConnMgr->getOwnId()) // Yes, we have to pass on the data to the multiplexer
{
std::map<TurtleFileHash,TurtleFileHashInfo>::iterator it( _incoming_file_hashes.find(tunnel.hash) ) ;
std::map<TurtleFileHash,TurtleFileHashInfo>::iterator it( _incoming_file_hashes.find(tunnel.hash) ) ;
#ifdef P3TURTLE_DEBUG
assert(!tunnel.hash.empty()) ;
assert(!tunnel.hash.empty()) ;
#endif
if(it==_incoming_file_hashes.end())
{
#ifdef P3TURTLE_DEBUG
std::cerr << "No tunnel for incoming data. Maybe the tunnel is being closed." << std::endl ;
#endif
return ;
}
const TurtleFileHashInfo& hash_info(it->second) ;
#ifdef P3TURTLE_DEBUG
std::cerr << " This is an endpoint for this file map." << std::endl ;
std::cerr << " Forwarding data to the multiplexer." << std::endl ;
std::cerr << " using peer_id=" << tunnel.vpid << ", hash=" << tunnel.hash << std::endl ;
#endif
// also update the hash time stamp to show that it's actually being downloaded.
it->second.time_stamp = time(NULL) ;
// we should check that there is no backward call to the turtle router!
//
_ft_server->getMultiplexer()->recvFileMap(tunnel.vpid,tunnel.hash,item->chunk_size,item->nb_chunks,item->compressed_map) ;
}
else // No, it's a request we should forward down the pipe.
if(it==_incoming_file_hashes.end())
{
#ifdef P3TURTLE_DEBUG
std::cerr << " Forwarding file map to peer " << tunnel.local_src << std::endl ;
std::cerr << "No tunnel for incoming data. Maybe the tunnel is being closed." << std::endl ;
#endif
RsTurtleFileMapItem *res_item = new RsTurtleFileMapItem(*item) ;
res_item->PeerId(tunnel.local_src) ;
sendItem(res_item) ;
return ;
}
const TurtleFileHashInfo& hash_info(it->second) ;
#ifdef P3TURTLE_DEBUG
std::cerr << " This is an endpoint for this file map." << std::endl ;
std::cerr << " Forwarding data to the multiplexer." << std::endl ;
std::cerr << " using peer_id=" << tunnel.vpid << ", hash=" << tunnel.hash << std::endl ;
#endif
// also update the hash time stamp to show that it's actually being downloaded.
it->second.time_stamp = time(NULL) ;
// we should check that there is no backward call to the turtle router!
//
_ft_server->getMultiplexer()->recvFileMap(tunnel.vpid,tunnel.hash,item->chunk_size,item->nb_chunks,item->compressed_map) ;
}
}
@ -962,8 +975,6 @@ void p3turtle::sendDataRequest(const std::string& peerId, const std::string& has
TurtleTunnelId tunnel_id = it->second ;
TurtleTunnel& tunnel(_local_tunnels[tunnel_id]) ;
// tunnel.time_stamp = time(NULL) ;
#ifdef P3TURTLE_DEBUG
assert(hash == tunnel.hash) ;
#endif
@ -1157,7 +1168,9 @@ void p3turtle::handleTunnelRequest(RsTurtleOpenTunnelItem *item)
item->print(std::cerr,0) ;
#endif
// If the item contains an already handled tunnel request, give up. This
// happens when the same tunnel request gets relayed by different peers
// happens when the same tunnel request gets relayed by different peers. We
// have to be very careful here, not to call ftController while mTurtleMtx is
// locked.
//
{
RsStackMutex stack(mTurtleMtx); /********** STACK LOCKED MTX ******/
@ -1169,62 +1182,71 @@ void p3turtle::handleTunnelRequest(RsTurtleOpenTunnelItem *item)
#endif
return ;
}
// This is a new request. Let's add it to the request map, and forward it to
// open peers.
// This is a new request. Let's add it to the request map, and forward
// it to open peers, while the mutex is locked, so no-one can trigger the
// lock before the data is consistent.
TurtleRequestInfo& req( _tunnel_requests_origins[item->request_id] ) ;
req.origin = item->PeerId() ;
req.time_stamp = time(NULL) ;
}
// If it's not for us, perform a local search. If something found, forward the search result back.
// If it's not for us, perform a local search. If something found, forward the search result back.
// We're off-mutex here.
if(item->PeerId() != mConnMgr->getOwnId())
bool found = false ;
FileInfo info ;
if(item->PeerId() != mConnMgr->getOwnId())
{
#ifdef P3TURTLE_DEBUG
std::cerr << " Request not from us. Performing local search" << std::endl ;
#endif
found = (_sharing_strategy != SHARE_FRIENDS_ONLY || item->depth < 2) && performLocalHashSearch(item->file_hash,info) ;
}
{
RsStackMutex stack(mTurtleMtx); /********** STACK LOCKED MTX ******/
if(found)
{
FileInfo info ;
#ifdef P3TURTLE_DEBUG
std::cerr << " Request not from us. Performing local search" << std::endl ;
std::cerr << " Local hash found. Sending tunnel ok to origin (" << item->PeerId() << ")." << std::endl ;
#endif
if((_sharing_strategy != SHARE_FRIENDS_ONLY || item->depth < 2) && performLocalHashSearch(item->file_hash,info))
{
#ifdef P3TURTLE_DEBUG
std::cerr << " Local hash found. Sending tunnel ok to origin (" << item->PeerId() << ")." << std::endl ;
#endif
// Send back tunnel ok to the same guy
//
RsTurtleTunnelOkItem *res_item = new RsTurtleTunnelOkItem ;
// Send back tunnel ok to the same guy
//
RsTurtleTunnelOkItem *res_item = new RsTurtleTunnelOkItem ;
res_item->request_id = item->request_id ;
res_item->tunnel_id = item->partial_tunnel_id ^ generatePersonalFilePrint(item->file_hash,false) ;
res_item->PeerId(item->PeerId()) ;
res_item->request_id = item->request_id ;
res_item->tunnel_id = item->partial_tunnel_id ^ generatePersonalFilePrint(item->file_hash,false) ;
res_item->PeerId(item->PeerId()) ;
sendItem(res_item) ;
sendItem(res_item) ;
// Note in the tunnels list that we have an ending tunnel here.
TurtleTunnel tt ;
tt.local_src = item->PeerId() ;
tt.hash = item->file_hash ;
tt.local_dst = mConnMgr->getOwnId() ; // this means us
tt.time_stamp = time(NULL) ;
// Note in the tunnels list that we have an ending tunnel here.
TurtleTunnel tt ;
tt.local_src = item->PeerId() ;
tt.hash = item->file_hash ;
tt.local_dst = mConnMgr->getOwnId() ; // this means us
tt.time_stamp = time(NULL) ;
_local_tunnels[res_item->tunnel_id] = tt ;
_local_tunnels[res_item->tunnel_id] = tt ;
// We add a virtual peer for that tunnel+hash combination.
//
addDistantPeer(item->file_hash,res_item->tunnel_id) ;
// We add a virtual peer for that tunnel+hash combination.
//
addDistantPeer(item->file_hash,res_item->tunnel_id) ;
// Store the size of the file, to be able to re-form data requests to the multiplexer.
//
_outgoing_file_hashes[item->file_hash] = info ;
// Store the size of the file, to be able to re-form data requests to the multiplexer.
//
_outgoing_file_hashes[item->file_hash] = info ;
// We return straight, because when something is found, there's no need to digg a tunnel further.
return ;
}
#ifdef P3TURTLE_DEBUG
else
std::cerr << " No hash found locally, or local file not allowed for distant peers. Forwarding. " << std::endl ;
#endif
// We return straight, because when something is found, there's no need to digg a tunnel further.
return ;
}
#ifdef P3TURTLE_DEBUG
else
std::cerr << " No hash found locally, or local file not allowed for distant peers. Forwarding. " << std::endl ;
#endif
}
// If search depth not too large, also forward this search request to all other peers.
@ -1538,6 +1560,9 @@ void p3turtle::returnSearchResult(RsTurtleSearchResultItem *item)
rsicontrol->getNotify().notifyTurtleSearchResult(item->request_id,item->result) ;
}
/// Warning: this function should never be called while the turtle mutex is locked.
/// Otherwize this is a possible source of cross-lock with the File mutex.
//
bool p3turtle::performLocalHashSearch(const TurtleFileHash& hash,FileInfo& info)
{
return rsFiles->FileDetails(hash, RS_FILE_HINTS_LOCAL | RS_FILE_HINTS_SPEC_ONLY | RS_FILE_HINTS_DOWNLOAD, info);