Added ability for turtle router to take client services

- new class RsTurtleClientService, from which should inherit the services
- turtle serialisation takes client services for deserialising
- added a client to p3turtle::monitorTunnels()
- added a generic turtle item to pass on arbitrary data

Moved ftServer as a subclass of RsTurtleClientService. 
- moved file transfer items that inherited from RsTurtleGenericTunnelItem into a separate file
- moved deserialisation triage into ftServer
- moved sending/receiving methods in ftServer (much less code, much more consistent!)

One bug remains: the TR cannot work as a server.

Changes should be 100% backward compatible.



git-svn-id: http://svn.code.sf.net/p/retroshare/code/branches/v0.5-GenericTunneling@6293 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
csoler 2013-04-06 09:21:01 +00:00
parent 270abcdc94
commit 0abb21d962
14 changed files with 1278 additions and 1504 deletions

View file

@ -111,8 +111,9 @@ p3turtle::p3turtle(p3LinkMgr *lm,ftServer *fs)
_ft_controller = fs->getController() ;
_random_bias = RSRandom::random_u32() ;
_serialiser = new RsTurtleSerialiser() ;
addSerialType(new RsTurtleSerialiser());
addSerialType(_serialiser);
_last_clean_time = 0 ;
_last_tunnel_management_time = 0 ;
@ -995,7 +996,6 @@ void p3turtle::routeGenericTunnelItem(RsTurtleGenericTunnelItem *item)
std::cerr << "p3Turtle: treating generic tunnel item:" << std::endl ;
item->print(std::cerr,1) ;
#endif
RsTurtleGenericTunnelItem::Direction direction = item->travelingDirection() ;
{
RsStackMutex stack(mTurtleMtx); /********** STACK LOCKED MTX ******/
@ -1021,9 +1021,16 @@ void p3turtle::routeGenericTunnelItem(RsTurtleGenericTunnelItem *item)
tunnel.transfered_bytes += static_cast<RsTurtleItem*>(item)->serial_size() ;
if(item->PeerId() == tunnel.local_dst)
item->setTravelingDirection(RsTurtleGenericTunnelItem::DIRECTION_CLIENT) ;
else if(item->PeerId() == tunnel.local_src)
item->setTravelingDirection(RsTurtleGenericTunnelItem::DIRECTION_SERVER) ;
else
std::cerr << "(EE) critical error in p3turtle::routeGenericTunnelItem(): item mismatches tunnel src/dst ids." << std::endl;
// Let's figure out whether this packet is for us or not.
if(direction == RsTurtleGenericTunnelItem::DIRECTION_CLIENT && tunnel.local_src != mLinkMgr->getOwnId())
if(item->PeerId() == tunnel.local_dst && tunnel.local_src != mLinkMgr->getOwnId()) //direction == RsTurtleGenericTunnelItem::DIRECTION_CLIENT &&
{
#ifdef P3TURTLE_DEBUG
std::cerr << " Forwarding generic item to peer " << tunnel.local_src << std::endl ;
@ -1032,14 +1039,16 @@ void p3turtle::routeGenericTunnelItem(RsTurtleGenericTunnelItem *item)
_traffic_info_buffer.unknown_updn_Bps += static_cast<RsTurtleItem*>(item)->serial_size() ;
if(dynamic_cast<RsTurtleFileDataItem*>(item) != NULL)
item->setPriorityLevel(QOS_PRIORITY_RS_TURTLE_FORWARD_FILE_DATA) ;
// This has been disabled for compilation reasons. Not sure we actually need it.
//
//if(dynamic_cast<RsTurtleFileDataItem*>(item) != NULL)
// item->setPriorityLevel(QOS_PRIORITY_RS_TURTLE_FORWARD_FILE_DATA) ;
sendItem(item) ;
return ;
}
if(direction == RsTurtleGenericTunnelItem::DIRECTION_SERVER && tunnel.local_dst != mLinkMgr->getOwnId())
if(item->PeerId() == tunnel.local_src && tunnel.local_dst != mLinkMgr->getOwnId()) //direction == RsTurtleGenericTunnelItem::DIRECTION_SERVER &&
{
#ifdef P3TURTLE_DEBUG
std::cerr << " Forwarding generic item to peer " << tunnel.local_dst << std::endl ;
@ -1057,198 +1066,31 @@ void p3turtle::routeGenericTunnelItem(RsTurtleGenericTunnelItem *item)
// This is done off-mutex, to avoid various deadlocks
//
switch(item->PacketSubType())
{
case RS_TURTLE_SUBTYPE_FILE_REQUEST: handleRecvFileRequest(dynamic_cast<RsTurtleFileRequestItem *>(item)) ;
break ;
case RS_TURTLE_SUBTYPE_FILE_DATA : handleRecvFileData(dynamic_cast<RsTurtleFileDataItem *>(item)) ;
break ;
case RS_TURTLE_SUBTYPE_FILE_MAP : handleRecvFileMap(dynamic_cast<RsTurtleFileMapItem *>(item)) ;
break ;
case RS_TURTLE_SUBTYPE_FILE_MAP_REQUEST: handleRecvFileMapRequest(dynamic_cast<RsTurtleFileMapRequestItem *>(item)) ;
break ;
case RS_TURTLE_SUBTYPE_FILE_CRC : handleRecvFileCRC32Map(dynamic_cast<RsTurtleFileCrcItem *>(item)) ;
break ;
case RS_TURTLE_SUBTYPE_FILE_CRC_REQUEST: handleRecvFileCRC32MapRequest(dynamic_cast<RsTurtleFileCrcRequestItem *>(item)) ;
break ;
case RS_TURTLE_SUBTYPE_CHUNK_CRC : handleRecvChunkCRC(dynamic_cast<RsTurtleChunkCrcItem *>(item)) ;
break ;
case RS_TURTLE_SUBTYPE_CHUNK_CRC_REQUEST: handleRecvChunkCRCRequest(dynamic_cast<RsTurtleChunkCrcRequestItem *>(item)) ;
break ;
default:
std::cerr << "WARNING: Unknown packet type received: id=" << reinterpret_cast<void*>(item->PacketSubType()) << ". Is somebody trying to poison you ?" << std::endl ;
#ifdef P3TURTLE_DEBUG
exit(-1) ;
#endif
}
handleRecvGenericTunnelItem(item) ;
delete item ;
}
void p3turtle::handleRecvFileRequest(RsTurtleFileRequestItem *item)
void p3turtle::handleRecvGenericTunnelItem(RsTurtleGenericTunnelItem *item)
{
#ifdef P3TURTLE_DEBUG
std::cerr << "p3Turtle: received file request item:" << std::endl ;
std::cerr << "p3Turtle: received Generic tunnel item:" << std::endl ;
item->print(std::cerr,1) ;
#endif
// This is a new request. Let's add it to the request map, and forward it to
// open peers.
TurtleVirtualPeerId vpid ;
uint64_t size ;
TurtleFileHash hash ;
std::string hash ;
std::string vpid ;
RsTurtleClientService *service ;
// Now lock the router
{
RsStackMutex stack(mTurtleMtx); /********** STACK LOCKED MTX ******/
std::map<TurtleTunnelId,TurtleTunnel>::iterator it2(_local_tunnels.find(item->tunnel_id)) ;
std::map<TurtleTunnelId,TurtleTunnel>::iterator it2(_local_tunnels.find(item->tunnelId())) ;
if(it2 == _local_tunnels.end())
{
#ifdef P3TURTLE_DEBUG
std::cerr << "p3turtle: got file request with unknown tunnel id " << (void*)item->tunnel_id << std::endl ;
#endif
return ;
}
TurtleTunnel& tunnel(it2->second) ;
std::map<TurtleFileHash,std::string>::const_iterator it(_outgoing_file_hashes.find(tunnel.hash)) ;
#ifdef P3TURTLE_DEBUG
assert(!tunnel.hash.empty()) ;
assert(it != _outgoing_file_hashes.end()) ;
std::cerr << " This is an endpoint for this file request." << std::endl ;
std::cerr << " Forwarding data request to the multiplexer." << std::endl ;
std::cerr << " using peer_id=" << tunnel.vpid << ", hash=" << tunnel.hash << std::endl ;
#endif
size = 0 ; //it->second.size ; Not used by ftDataMultiplex actually
vpid = tunnel.vpid ;
hash = tunnel.hash ;
}
// This call is voluntarily off-mutex gards because it can cause cross mutex locks with the multiplexer.
// (Yeah, this bug was a shity hard one to catch).
//
_ft_server->getMultiplexer()->recvDataRequest(vpid,hash,size,item->chunk_offset,item->chunk_size) ;
}
void p3turtle::handleRecvFileData(RsTurtleFileDataItem *item)
{
#ifdef P3TURTLE_DEBUG
std::cerr << "p3Turtle: received file data item:" << std::endl ;
item->print(std::cerr,1) ;
#endif
TurtleVirtualPeerId vpid ;
uint64_t size ;
TurtleFileHash hash ;
{
RsStackMutex stack(mTurtleMtx); /********** STACK LOCKED MTX ******/
_traffic_info_buffer.data_dn_Bps += static_cast<RsTurtleItem*>(item)->serial_size() ;
std::map<TurtleTunnelId,TurtleTunnel>::iterator it2(_local_tunnels.find(item->tunnel_id)) ;
if(it2 == _local_tunnels.end())
{
#ifdef P3TURTLE_DEBUG
std::cerr << "p3turtle: got file data with unknown tunnel id " << (void*)item->tunnel_id << std::endl ;
#endif
return ;
}
TurtleTunnel& tunnel(it2->second) ;
std::map<TurtleFileHash,TurtleHashInfo>::iterator it( _incoming_file_hashes.find(tunnel.hash) ) ;
#ifdef P3TURTLE_DEBUG
assert(!tunnel.hash.empty()) ;
#endif
if(it==_incoming_file_hashes.end())
{
#ifdef P3TURTLE_DEBUG
std::cerr << "No tunnel for incoming data. Maybe the tunnel is being closed." << std::endl ;
#endif
return ;
}
#ifdef P3TURTLE_DEBUG
std::cerr << " This is an endpoint for this data chunk." << std::endl ;
std::cerr << " Forwarding data to the multiplexer." << std::endl ;
std::cerr << " using peer_id=" << tunnel.vpid << ", hash=" << tunnel.hash << std::endl ;
#endif
//_ft_server->getMultiplexer()->recvData(tunnel.vpid,tunnel.hash,hash_info.size,item->chunk_offset,item->chunk_size,item->chunk_data) ;
vpid = tunnel.vpid ;
hash = tunnel.hash ;
size = 0 ; // Not used by ftDataMultiplex, see ftDataMultiplex:handleRecvData()
// also update the hash time stamp to show that it's actually being downloaded.
//it->second.time_stamp = time(NULL) ;
}
_ft_server->getMultiplexer()->recvData(vpid,hash,size,item->chunk_offset,item->chunk_size,item->chunk_data) ;
item->chunk_data = NULL ; // this prevents deletion in the destructor of RsFileDataItem, because data will be deleted
// down _ft_server->getMultiplexer()->recvData()...in ftTransferModule::recvFileData
}
void p3turtle::handleRecvFileMapRequest(RsTurtleFileMapRequestItem *item)
{
#ifdef P3TURTLE_DEBUG
std::cerr << "p3Turtle: received file Map item:" << std::endl ;
item->print(std::cerr,1) ;
#endif
std::string hash,vpid ;
{
RsStackMutex stack(mTurtleMtx); /********** STACK LOCKED MTX ******/
std::map<TurtleTunnelId,TurtleTunnel>::iterator it2(_local_tunnels.find(item->tunnel_id)) ;
if(it2 == _local_tunnels.end())
{
#ifdef P3TURTLE_DEBUG
std::cerr << "p3turtle: got file data with unknown tunnel id " << (void*)item->tunnel_id << std::endl ;
#endif
return ;
}
TurtleTunnel& tunnel(it2->second) ;
#ifdef P3TURTLE_DEBUG
assert(!tunnel.hash.empty()) ;
std::cerr << " This is an endpoint for this file map request." << std::endl ;
std::cerr << " Forwarding data to the multiplexer." << std::endl ;
std::cerr << " using peer_id=" << tunnel.vpid << ", hash=" << tunnel.hash << std::endl ;
#endif
// we should check that there is no backward call to the turtle router!
//
hash = tunnel.hash ;
vpid = tunnel.vpid ;
}
_ft_server->getMultiplexer()->recvChunkMapRequest(vpid,hash,item->direction == RsTurtleGenericTunnelItem::DIRECTION_CLIENT) ;
}
void p3turtle::handleRecvFileMap(RsTurtleFileMapItem *item)
{
#ifdef P3TURTLE_DEBUG
std::cerr << "p3Turtle: received file Map item:" << std::endl ;
item->print(std::cerr,1) ;
#endif
std::string hash,vpid ;
{
RsStackMutex stack(mTurtleMtx); /********** STACK LOCKED MTX ******/
std::map<TurtleTunnelId,TurtleTunnel>::iterator it2(_local_tunnels.find(item->tunnel_id)) ;
if(it2 == _local_tunnels.end())
{
#ifdef P3TURTLE_DEBUG
std::cerr << "p3turtle: got file data with unknown tunnel id " << (void*)item->tunnel_id << std::endl ;
std::cerr << "p3turtle: got file CRC32 map with unknown tunnel id " << (void*)item->tunnelId() << std::endl ;
#endif
return ;
}
@ -1266,161 +1108,29 @@ void p3turtle::handleRecvFileMap(RsTurtleFileMapItem *item)
//
vpid = tunnel.vpid ;
hash = tunnel.hash ;
}
_ft_server->getMultiplexer()->recvChunkMap(vpid,hash,item->compressed_map,item->direction == RsTurtleGenericTunnelItem::DIRECTION_CLIENT) ;
}
void p3turtle::handleRecvFileCRC32MapRequest(RsTurtleFileCrcRequestItem *item)
{
#ifdef P3TURTLE_DEBUG
std::cerr << "p3Turtle: received file CRC32 Map Request item:" << std::endl ;
item->print(std::cerr,1) ;
#endif
std::string hash,vpid ;
{
RsStackMutex stack(mTurtleMtx); /********** STACK LOCKED MTX ******/
std::map<TurtleFileHash,TurtleHashInfo>::const_iterator it = _incoming_file_hashes.find(hash) ;
std::map<TurtleTunnelId,TurtleTunnel>::iterator it2(_local_tunnels.find(item->tunnel_id)) ;
if(it2 == _local_tunnels.end())
if(it == _incoming_file_hashes.end())
{
#ifdef P3TURTLE_DEBUG
std::cerr << "p3turtle: got file data with unknown tunnel id " << (void*)item->tunnel_id << std::endl ;
#endif
std::cerr << "p3turtle::handleRecvGenericTunnelItem(): hash " << hash << " for tunnel " << (void*)(it2->first) << " has no attached service! Dropping the item. This is a serious consistency error." << std::endl;
return ;
}
TurtleTunnel& tunnel(it2->second) ;
#ifdef P3TURTLE_DEBUG
assert(!tunnel.hash.empty()) ;
std::cerr << " This is an endpoint for this file crc request." << std::endl ;
std::cerr << " Forwarding data to the multiplexer." << std::endl ;
std::cerr << " using peer_id=" << tunnel.vpid << ", hash=" << tunnel.hash << std::endl ;
#endif
// we should check that there is no backward call to the turtle router!
//
hash = tunnel.hash ;
vpid = tunnel.vpid ;
service = it->second.service ;
}
_ft_server->getMultiplexer()->recvCRC32MapRequest(vpid,hash) ;
service->receiveTurtleData(item,hash,vpid,item->travelingDirection()) ;
}
void p3turtle::handleRecvChunkCRC(RsTurtleChunkCrcItem *item)
{
#ifdef P3TURTLE_DEBUG
std::cerr << "p3Turtle: received file CRC32 Map Request item:" << std::endl ;
item->print(std::cerr,1) ;
#endif
std::string hash,vpid ;
{
RsStackMutex stack(mTurtleMtx); /********** STACK LOCKED MTX ******/
std::map<TurtleTunnelId,TurtleTunnel>::iterator it2(_local_tunnels.find(item->tunnel_id)) ;
if(it2 == _local_tunnels.end())
{
#ifdef P3TURTLE_DEBUG
std::cerr << "p3turtle: chunk crc request with unknown tunnel id " << (void*)item->tunnel_id << std::endl ;
#endif
return ;
}
TurtleTunnel& tunnel(it2->second) ;
#ifdef P3TURTLE_DEBUG
assert(!tunnel.hash.empty()) ;
std::cerr << " This is an endpoint for this file crc request." << std::endl ;
std::cerr << " Forwarding data to the multiplexer." << std::endl ;
std::cerr << " using peer_id=" << tunnel.vpid << ", hash=" << tunnel.hash << std::endl ;
#endif
// we should check that there is no backward call to the turtle router!
//
hash = tunnel.hash ;
vpid = tunnel.vpid ;
}
_ft_server->getMultiplexer()->recvSingleChunkCRC(vpid,hash,item->chunk_number,item->check_sum) ;
}
void p3turtle::handleRecvChunkCRCRequest(RsTurtleChunkCrcRequestItem *item)
{
#ifdef P3TURTLE_DEBUG
std::cerr << "p3Turtle: received file CRC32 Map Request item:" << std::endl ;
item->print(std::cerr,1) ;
#endif
std::string hash,vpid ;
{
RsStackMutex stack(mTurtleMtx); /********** STACK LOCKED MTX ******/
std::map<TurtleTunnelId,TurtleTunnel>::iterator it2(_local_tunnels.find(item->tunnel_id)) ;
if(it2 == _local_tunnels.end())
{
#ifdef P3TURTLE_DEBUG
std::cerr << "p3turtle: chunk crc request with unknown tunnel id " << (void*)item->tunnel_id << std::endl ;
#endif
return ;
}
TurtleTunnel& tunnel(it2->second) ;
#ifdef P3TURTLE_DEBUG
assert(!tunnel.hash.empty()) ;
std::cerr << " This is an endpoint for this file crc request." << std::endl ;
std::cerr << " Forwarding data to the multiplexer." << std::endl ;
std::cerr << " using peer_id=" << tunnel.vpid << ", hash=" << tunnel.hash << std::endl ;
#endif
// we should check that there is no backward call to the turtle router!
//
hash = tunnel.hash ;
vpid = tunnel.vpid ;
}
_ft_server->getMultiplexer()->recvSingleChunkCRCRequest(vpid,hash,item->chunk_number) ;
}
void p3turtle::handleRecvFileCRC32Map(RsTurtleFileCrcItem *item)
{
#ifdef P3TURTLE_DEBUG
std::cerr << "p3Turtle: received file CRC32 Map item:" << std::endl ;
item->print(std::cerr,1) ;
#endif
std::string hash,vpid ;
{
RsStackMutex stack(mTurtleMtx); /********** STACK LOCKED MTX ******/
std::map<TurtleTunnelId,TurtleTunnel>::iterator it2(_local_tunnels.find(item->tunnel_id)) ;
if(it2 == _local_tunnels.end())
{
#ifdef P3TURTLE_DEBUG
std::cerr << "p3turtle: got file CRC32 map with unknown tunnel id " << (void*)item->tunnel_id << std::endl ;
#endif
return ;
}
TurtleTunnel& tunnel(it2->second) ;
#ifdef P3TURTLE_DEBUG
assert(!tunnel.hash.empty()) ;
std::cerr << " This is an endpoint for this file map." << std::endl ;
std::cerr << " Forwarding data to the multiplexer." << std::endl ;
std::cerr << " using peer_id=" << tunnel.vpid << ", hash=" << tunnel.hash << std::endl ;
#endif
// We should check that there is no backward call to the turtle router!
//
vpid = tunnel.vpid ;
hash = tunnel.hash ;
}
_ft_server->getMultiplexer()->recvCRC32Map(vpid,hash,item->crc_map) ;
}
// Send a data request into the correct tunnel for the given file hash
void p3turtle::sendDataRequest(const std::string& peerId, const std::string& , uint64_t, uint64_t offset, uint32_t chunksize)
//
void p3turtle::sendTurtleData(const std::string& virtual_peer_id,RsTurtleGenericTunnelItem *item)
{
RsStackMutex stack(mTurtleMtx); /********** STACK LOCKED MTX ******/
// get the proper tunnel for this file hash and peer id.
std::map<TurtleVirtualPeerId,TurtleTunnelId>::const_iterator it(_virtual_peers.find(peerId)) ;
std::map<TurtleVirtualPeerId,TurtleTunnelId>::const_iterator it(_virtual_peers.find(virtual_peer_id)) ;
if(it == _virtual_peers.end())
{
@ -1432,255 +1142,31 @@ void p3turtle::sendDataRequest(const std::string& peerId, const std::string& , u
TurtleTunnelId tunnel_id = it->second ;
TurtleTunnel& tunnel(_local_tunnels[tunnel_id]) ;
RsTurtleFileRequestItem *item = new RsTurtleFileRequestItem ;
item->tunnel_id = tunnel_id ; // we should randomly select a tunnel, or something more clever.
item->chunk_offset = offset ;
item->chunk_size = chunksize ;
item->PeerId(tunnel.local_dst) ;
#ifdef P3TURTLE_DEBUG
std::cerr << "p3turtle: sending file req (chunksize=" << item->chunk_size << ", offset=" << item->chunk_offset << ", hash=0x" << tunnel.hash << ") through tunnel " << (void*)item->tunnel_id << ", next peer=" << tunnel.local_dst << std::endl ;
#endif
sendItem(item) ;
}
// Send file data into the correct tunnel for the given file hash
void p3turtle::sendFileData(const std::string& peerId, const std::string& , uint64_t, uint64_t offset, uint32_t chunksize, void *data)
{
RsStackMutex stack(mTurtleMtx); /********** STACK LOCKED MTX ******/
// get the proper tunnel for this file hash and peer id.
std::map<TurtleVirtualPeerId,TurtleTunnelId>::const_iterator it(_virtual_peers.find(peerId)) ;
if(it == _virtual_peers.end())
{
#ifdef P3TURTLE_DEBUG
std::cerr << "p3turtle::sendData: cannot find virtual peer " << peerId << " in VP list." << std::endl ;
#endif
return ;
}
TurtleTunnelId tunnel_id = it->second ;
TurtleTunnel& tunnel(_local_tunnels[tunnel_id]) ;
tunnel.time_stamp = time(NULL) ;
RsTurtleFileDataItem *item = new RsTurtleFileDataItem ;
item->tunnel_id = tunnel_id ;
item->chunk_offset = offset ;
item->chunk_size = chunksize ;
item->chunk_data = malloc(chunksize) ;
tunnel.transfered_bytes += static_cast<RsTurtleItem*>(item)->serial_size();
if(item->chunk_data == NULL)
{
std::cerr << "p3turtle: Warning: failed malloc of " << chunksize << " bytes for sending data packet." << std::endl ;
delete item;
return ;
}
memcpy(item->chunk_data,(void*)((uint8_t*)data),chunksize) ;
item->PeerId(tunnel.local_src) ;
#ifdef P3TURTLE_DEBUG
std::cerr << "p3turtle: sending file data (chunksize=" << item->chunk_size << ", offset=" << item->chunk_offset << ", hash=0x" << tunnel.hash << ") through tunnel " << (void*)item->tunnel_id << ", next peer=" << tunnel.local_src << std::endl ;
#endif
_traffic_info_buffer.data_up_Bps += static_cast<RsTurtleItem*>(item)->serial_size() ;
sendItem(item) ;
}
void p3turtle::sendChunkMapRequest(const std::string& peerId,const std::string& ,bool is_client)
{
RsStackMutex stack(mTurtleMtx); /********** STACK LOCKED MTX ******/
// get the proper tunnel for this file hash and peer id.
std::map<TurtleVirtualPeerId,TurtleTunnelId>::const_iterator it(_virtual_peers.find(peerId)) ;
if(it == _virtual_peers.end())
{
#ifdef P3TURTLE_DEBUG
std::cerr << "p3turtle::senddataRequest: cannot find virtual peer " << peerId << " in VP list." << std::endl ;
#endif
return ;
}
TurtleTunnelId tunnel_id = it->second ;
TurtleTunnel& tunnel(_local_tunnels[tunnel_id]) ;
RsTurtleFileMapRequestItem *item = new RsTurtleFileMapRequestItem ;
item->tunnel_id = tunnel_id ;
std::string ownid = mLinkMgr->getOwnId() ;
if(tunnel.local_src == ownid)
{
assert(!is_client) ;
item->direction = RsTurtleGenericTunnelItem::DIRECTION_SERVER ;
item->setTravelingDirection(RsTurtleGenericTunnelItem::DIRECTION_SERVER) ;
item->PeerId(tunnel.local_dst) ;
_traffic_info_buffer.data_dn_Bps += item->serial_size() ;
}
else if(tunnel.local_dst == ownid)
{
assert(is_client) ;
item->direction = RsTurtleGenericTunnelItem::DIRECTION_CLIENT ;
item->setTravelingDirection(RsTurtleGenericTunnelItem::DIRECTION_CLIENT) ;
item->PeerId(tunnel.local_src) ;
_traffic_info_buffer.data_up_Bps += item->serial_size() ;
}
else
std::cerr << "p3turtle::sendChunkMapRequest: consistency error!" << std::endl ;
#ifdef P3TURTLE_DEBUG
std::cerr << "p3turtle: sending chunk map req to peer " << peerId << ", hash=0x" << tunnel.hash << ") through tunnel " << (void*)item->tunnel_id << ", next peer=" << item->PeerId() << std::endl ;
#endif
sendItem(item) ;
}
void p3turtle::sendChunkMap(const std::string& peerId,const std::string& ,const CompressedChunkMap& cmap,bool is_client)
{
RsStackMutex stack(mTurtleMtx); /********** STACK LOCKED MTX ******/
// get the proper tunnel for this file hash and peer id.
std::map<TurtleVirtualPeerId,TurtleTunnelId>::const_iterator it(_virtual_peers.find(peerId)) ;
if(it == _virtual_peers.end())
{
#ifdef P3TURTLE_DEBUG
std::cerr << "p3turtle::senddataRequest: cannot find virtual peer " << peerId << " in VP list." << std::endl ;
#endif
std::cerr << "p3Turtle::sendTurtleData(): asked to send a packet into a tunnel that is not registered. Dropping packet." << std::endl ;
delete item ;
return ;
}
TurtleTunnelId tunnel_id = it->second ;
TurtleTunnel& tunnel(_local_tunnels[tunnel_id]) ;
RsTurtleFileMapItem *item = new RsTurtleFileMapItem ;
item->tunnel_id = tunnel_id ;
item->compressed_map = cmap ;
std::string ownid = mLinkMgr->getOwnId() ;
if(tunnel.local_src == ownid)
{
assert(!is_client) ;
item->direction = RsTurtleGenericTunnelItem::DIRECTION_SERVER ;
item->PeerId(tunnel.local_dst) ;
}
else if(tunnel.local_dst == ownid)
{
assert(is_client) ;
item->direction = RsTurtleGenericTunnelItem::DIRECTION_CLIENT ;
item->PeerId(tunnel.local_src) ;
}
else
std::cerr << "p3turtle::sendChunkMap: consistency error!" << std::endl ;
#ifdef P3TURTLE_DEBUG
std::cerr << "p3turtle: sending chunk map to peer " << peerId << ", hash=0x" << tunnel.hash << ") through tunnel " << (void*)item->tunnel_id << ", next peer=" << item->PeerId() << std::endl ;
#endif
sendItem(item) ;
}
void p3turtle::sendSingleChunkCRC(const std::string& peerId,const std::string&,uint32_t chunk_number,const Sha1CheckSum& crc)
{
RsStackMutex stack(mTurtleMtx); /********** STACK LOCKED MTX ******/
// get the proper tunnel for this file hash and peer id.
std::map<TurtleVirtualPeerId,TurtleTunnelId>::const_iterator it(_virtual_peers.find(peerId)) ;
if(it == _virtual_peers.end())
{
#ifdef P3TURTLE_DEBUG
std::cerr << "p3turtle::sendCRC32MapRequest: cannot find virtual peer " << peerId << " in VP list." << std::endl ;
#endif
return ;
}
TurtleTunnelId tunnel_id = it->second ;
TurtleTunnel& tunnel(_local_tunnels[tunnel_id]) ;
RsTurtleChunkCrcItem *item = new RsTurtleChunkCrcItem;
item->tunnel_id = tunnel_id ;
item->chunk_number = chunk_number ;
item->check_sum = crc ;
item->PeerId(tunnel.local_dst) ;
#ifdef P3TURTLE_DEBUG
std::cerr << "p3turtle: sending CRC32 map request to peer " << peerId << ", hash=0x" << tunnel.hash << ") through tunnel " << (void*)item->tunnel_id << ", next peer=" << item->PeerId() << std::endl ;
#endif
sendItem(item) ;
}
void p3turtle::sendSingleChunkCRCRequest(const std::string& peerId,const std::string&,uint32_t chunk_number)
{
RsStackMutex stack(mTurtleMtx); /********** STACK LOCKED MTX ******/
// get the proper tunnel for this file hash and peer id.
std::map<TurtleVirtualPeerId,TurtleTunnelId>::const_iterator it(_virtual_peers.find(peerId)) ;
if(it == _virtual_peers.end())
{
#ifdef P3TURTLE_DEBUG
std::cerr << "p3turtle::sendCRC32MapRequest: cannot find virtual peer " << peerId << " in VP list." << std::endl ;
#endif
return ;
}
TurtleTunnelId tunnel_id = it->second ;
TurtleTunnel& tunnel(_local_tunnels[tunnel_id]) ;
RsTurtleChunkCrcRequestItem *item = new RsTurtleChunkCrcRequestItem;
item->tunnel_id = tunnel_id ;
item->chunk_number = chunk_number ;
item->PeerId(tunnel.local_dst) ;
#ifdef P3TURTLE_DEBUG
std::cerr << "p3turtle: sending CRC32 map request to peer " << peerId << ", hash=0x" << tunnel.hash << ") through tunnel " << (void*)item->tunnel_id << ", next peer=" << item->PeerId() << std::endl ;
#endif
sendItem(item) ;
}
void p3turtle::sendCRC32MapRequest(const std::string& peerId,const std::string& )
{
RsStackMutex stack(mTurtleMtx); /********** STACK LOCKED MTX ******/
// get the proper tunnel for this file hash and peer id.
std::map<TurtleVirtualPeerId,TurtleTunnelId>::const_iterator it(_virtual_peers.find(peerId)) ;
if(it == _virtual_peers.end())
{
#ifdef P3TURTLE_DEBUG
std::cerr << "p3turtle::sendCRC32MapRequest: cannot find virtual peer " << peerId << " in VP list." << std::endl ;
#endif
return ;
}
TurtleTunnelId tunnel_id = it->second ;
TurtleTunnel& tunnel(_local_tunnels[tunnel_id]) ;
RsTurtleFileCrcRequestItem *item = new RsTurtleFileCrcRequestItem;
item->tunnel_id = tunnel_id ;
// item->crc_map = cmap ;
item->PeerId(tunnel.local_dst) ;
#ifdef P3TURTLE_DEBUG
std::cerr << "p3turtle: sending CRC32 map request to peer " << peerId << ", hash=0x" << tunnel.hash << ") through tunnel " << (void*)item->tunnel_id << ", next peer=" << item->PeerId() << std::endl ;
#endif
sendItem(item) ;
}
void p3turtle::sendCRC32Map(const std::string& peerId,const std::string& ,const CRC32Map& cmap)
{
RsStackMutex stack(mTurtleMtx); /********** STACK LOCKED MTX ******/
// get the proper tunnel for this file hash and peer id.
std::map<TurtleVirtualPeerId,TurtleTunnelId>::const_iterator it(_virtual_peers.find(peerId)) ;
if(it == _virtual_peers.end())
{
#ifdef P3TURTLE_DEBUG
std::cerr << "p3turtle::senddataRequest: cannot find virtual peer " << peerId << " in VP list." << std::endl ;
#endif
return ;
}
TurtleTunnelId tunnel_id = it->second ;
TurtleTunnel& tunnel(_local_tunnels[tunnel_id]) ;
RsTurtleFileCrcItem *item = new RsTurtleFileCrcItem ;
item->tunnel_id = tunnel_id ;
item->crc_map = cmap ;
item->PeerId(tunnel.local_src) ;
#ifdef P3TURTLE_DEBUG
std::cerr << "p3turtle: sending CRC32 map to peer " << peerId << ", hash=0x" << tunnel.hash << ") through tunnel " << (void*)item->tunnel_id << ", next peer=" << item->PeerId() << std::endl ;
std::cerr << "p3turtle: sending service packet to virtual peer id " << virtual_peer_id << ", hash=0x" << tunnel.hash << ", tunnel = " << (void*)item->tunnel_id << ", next peer=" << tunnel.local_dst << std::endl ;
#endif
sendItem(item) ;
}
@ -2302,7 +1788,7 @@ TurtleRequestId p3turtle::turtleSearch(const LinearizedExpression& expr)
return id ;
}
void p3turtle::monitorTunnels(const std::string& hash)
void p3turtle::monitorTunnels(const std::string& hash,RsTurtleClientService *client_service)
{
{
RsStackMutex stack(mTurtleMtx); /********** STACK LOCKED MTX ******/
@ -2338,6 +1824,7 @@ void p3turtle::monitorTunnels(const std::string& hash)
// also should send associated request to the file transfer module.
_incoming_file_hashes[hash].last_digg_time = RSRandom::random_u32()%10 ;
_incoming_file_hashes[hash].service = client_service ;
}
IndicateConfigChanged() ; // initiates saving of handled hashes.
@ -2380,6 +1867,7 @@ void p3turtle::registerTunnelService(RsTurtleClientService *service)
std::cerr << "p3turtle: registered new tunnel service " << (void*)service << std::endl;
_registered_services.push_back(service) ;
_serialiser->registerClientService(service) ;
}
static std::string printFloatNumber(float num,bool friendly=false)