- Implemented chunk-based file transfer from partial sources. This in particular means:

- exchange of chunk availability maps from different peers
    - correct handling of what is available to which source before asking the data
    - correct display of chunks in the progress bars
    - generalised the use of compressed chunk maps
    - removed the size parameters from the hash search functions
   
- In addition:
    - suppressed a number of per-value transfers of std::string
    - improved the FileTransferInfo Widget, to show some additional info

Still to be done:
    - chunk map exchange for non anonymous traffic (easy)
    - improve accuracy of completion for uploads (for now it's a integer number of chunks)
    - check compilation on windows




git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@1993 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
csoler 2010-01-11 16:00:42 +00:00
parent add5d45eeb
commit cfaaec31c7
36 changed files with 1247 additions and 573 deletions

View file

@ -782,25 +782,24 @@ void p3turtle::routeGenericTunnelItem(RsTurtleGenericTunnelItem *item)
// The packet was not forwarded, so it is for us. Let's treat it.
// This is done off-mutex, to avoid various deadlocks
//
if(direction == RsTurtleGenericTunnelItem::DIRECTION_SERVER)
switch(item->PacketSubType())
{
case RS_TURTLE_SUBTYPE_FILE_REQUEST: handleRecvFileRequest(dynamic_cast<RsTurtleFileRequestItem *>(item)) ;
break ;
default:
std::cerr << "Unknown server packet type received: id=" << (void*)(item->PacketSubType()) << std::endl ;
exit(-1) ;
}
switch(item->PacketSubType())
{
case RS_TURTLE_SUBTYPE_FILE_REQUEST: handleRecvFileRequest(dynamic_cast<RsTurtleFileRequestItem *>(item)) ;
break ;
if(direction == RsTurtleGenericTunnelItem::DIRECTION_CLIENT)
switch(item->PacketSubType())
{
case RS_TURTLE_SUBTYPE_FILE_DATA : handleRecvFileData(dynamic_cast<RsTurtleFileDataItem *>(item)) ;
break ;
default:
std::cerr << "Unknown client packet type received: id=" << (void*)(item->PacketSubType()) << std::endl ;
exit(-1) ;
}
case RS_TURTLE_SUBTYPE_FILE_DATA : handleRecvFileData(dynamic_cast<RsTurtleFileDataItem *>(item)) ;
break ;
case RS_TURTLE_SUBTYPE_FILE_MAP : handleRecvFileMap(dynamic_cast<RsTurtleFileMapItem *>(item)) ;
break ;
case RS_TURTLE_SUBTYPE_FILE_MAP_REQUEST: handleRecvFileMapRequest(dynamic_cast<RsTurtleFileMapRequestItem *>(item)) ;
break ;
default:
std::cerr << "Unknown packet type received: id=" << (void*)(item->PacketSubType()) << std::endl ;
exit(-1) ;
}
delete item ;
}
@ -908,6 +907,42 @@ void p3turtle::handleRecvFileData(RsTurtleFileDataItem *item)
// down _ft_server->getMultiplexer()->recvData()...in ftTransferModule::recvFileData
}
void p3turtle::handleRecvFileMapRequest(RsTurtleFileMapRequestItem *item)
{
#ifdef P3TURTLE_DEBUG
std::cerr << "p3Turtle: received file Map item:" << std::endl ;
item->print(std::cerr,1) ;
#endif
std::string hash,vpid ;
{
RsStackMutex stack(mTurtleMtx); /********** STACK LOCKED MTX ******/
std::map<TurtleTunnelId,TurtleTunnel>::iterator it2(_local_tunnels.find(item->tunnel_id)) ;
if(it2 == _local_tunnels.end())
{
#ifdef P3TURTLE_DEBUG
std::cerr << "p3turtle: got file data with unknown tunnel id " << (void*)item->tunnel_id << std::endl ;
#endif
return ;
}
TurtleTunnel& tunnel(it2->second) ;
#ifdef P3TURTLE_DEBUG
assert(!tunnel.hash.empty()) ;
std::cerr << " This is an endpoint for this file map request." << std::endl ;
std::cerr << " Forwarding data to the multiplexer." << std::endl ;
std::cerr << " using peer_id=" << tunnel.vpid << ", hash=" << tunnel.hash << std::endl ;
#endif
// we should check that there is no backward call to the turtle router!
//
hash = tunnel.hash ;
vpid = tunnel.vpid ;
}
_ft_server->getMultiplexer()->recvChunkMapRequest(vpid,hash,item->direction == RsTurtleGenericTunnelItem::DIRECTION_CLIENT) ;
}
void p3turtle::handleRecvFileMap(RsTurtleFileMapItem *item)
{
@ -915,6 +950,7 @@ void p3turtle::handleRecvFileMap(RsTurtleFileMapItem *item)
std::cerr << "p3Turtle: received file Map item:" << std::endl ;
item->print(std::cerr,1) ;
#endif
std::string hash,vpid ;
{
RsStackMutex stack(mTurtleMtx); /********** STACK LOCKED MTX ******/
@ -930,31 +966,19 @@ void p3turtle::handleRecvFileMap(RsTurtleFileMapItem *item)
TurtleTunnel& tunnel(it2->second) ;
std::map<TurtleFileHash,TurtleFileHashInfo>::iterator it( _incoming_file_hashes.find(tunnel.hash) ) ;
#ifdef P3TURTLE_DEBUG
assert(!tunnel.hash.empty()) ;
#endif
if(it==_incoming_file_hashes.end())
{
#ifdef P3TURTLE_DEBUG
std::cerr << "No tunnel for incoming data. Maybe the tunnel is being closed." << std::endl ;
#endif
return ;
}
const TurtleFileHashInfo& hash_info(it->second) ;
#ifdef P3TURTLE_DEBUG
std::cerr << " This is an endpoint for this file map." << std::endl ;
std::cerr << " Forwarding data to the multiplexer." << std::endl ;
std::cerr << " using peer_id=" << tunnel.vpid << ", hash=" << tunnel.hash << std::endl ;
#endif
// also update the hash time stamp to show that it's actually being downloaded.
it->second.time_stamp = time(NULL) ;
// we should check that there is no backward call to the turtle router!
// We should check that there is no backward call to the turtle router!
//
_ft_server->getMultiplexer()->recvFileMap(tunnel.vpid,tunnel.hash,item->chunk_size,item->nb_chunks,item->compressed_map) ;
vpid = tunnel.vpid ;
hash = tunnel.hash ;
}
_ft_server->getMultiplexer()->recvChunkMap(vpid,hash,item->compressed_map,item->direction == RsTurtleGenericTunnelItem::DIRECTION_CLIENT) ;
}
// Send a data request into the correct tunnel for the given file hash
@ -1032,6 +1056,95 @@ void p3turtle::sendFileData(const std::string& peerId, const std::string& hash,
sendItem(item) ;
}
void p3turtle::sendChunkMapRequest(const std::string& peerId,const std::string& hash)
{
RsStackMutex stack(mTurtleMtx); /********** STACK LOCKED MTX ******/
// get the proper tunnel for this file hash and peer id.
std::map<TurtleVirtualPeerId,TurtleTunnelId>::const_iterator it(_virtual_peers.find(peerId)) ;
if(it == _virtual_peers.end())
{
#ifdef P3TURTLE_DEBUG
std::cerr << "p3turtle::senddataRequest: cannot find virtual peer " << peerId << " in VP list." << std::endl ;
#endif
return ;
}
TurtleTunnelId tunnel_id = it->second ;
TurtleTunnel& tunnel(_local_tunnels[tunnel_id]) ;
#ifdef P3TURTLE_DEBUG
assert(hash == tunnel.hash) ;
#endif
RsTurtleFileMapRequestItem *item = new RsTurtleFileMapRequestItem ;
item->tunnel_id = tunnel_id ;
std::string ownid = mConnMgr->getOwnId() ;
if(tunnel.local_src == ownid)
{
item->direction = RsTurtleGenericTunnelItem::DIRECTION_SERVER ;
item->PeerId(tunnel.local_dst) ;
}
else if(tunnel.local_dst == ownid)
{
item->direction = RsTurtleGenericTunnelItem::DIRECTION_CLIENT ;
item->PeerId(tunnel.local_src) ;
}
else
std::cerr << "p3turtle::sendChunkMap: consistency error!" << std::endl ;
#ifdef P3TURTLE_DEBUG
std::cerr << "p3turtle: sending chunk map req to peer " << peerId << ", hash=0x" << hash << ") through tunnel " << (void*)item->tunnel_id << ", next peer=" << item->PeerId() << std::endl ;
#endif
sendItem(item) ;
}
void p3turtle::sendChunkMap(const std::string& peerId,const std::string& hash,const CompressedChunkMap& cmap)
{
RsStackMutex stack(mTurtleMtx); /********** STACK LOCKED MTX ******/
// get the proper tunnel for this file hash and peer id.
std::map<TurtleVirtualPeerId,TurtleTunnelId>::const_iterator it(_virtual_peers.find(peerId)) ;
if(it == _virtual_peers.end())
{
#ifdef P3TURTLE_DEBUG
std::cerr << "p3turtle::senddataRequest: cannot find virtual peer " << peerId << " in VP list." << std::endl ;
#endif
return ;
}
TurtleTunnelId tunnel_id = it->second ;
TurtleTunnel& tunnel(_local_tunnels[tunnel_id]) ;
#ifdef P3TURTLE_DEBUG
assert(hash == tunnel.hash) ;
#endif
RsTurtleFileMapItem *item = new RsTurtleFileMapItem ;
item->tunnel_id = tunnel_id ;
item->compressed_map = cmap ;
std::string ownid = mConnMgr->getOwnId() ;
if(tunnel.local_src == ownid)
{
item->direction = RsTurtleGenericTunnelItem::DIRECTION_SERVER ;
item->PeerId(tunnel.local_dst) ;
}
else if(tunnel.local_dst == ownid)
{
item->direction = RsTurtleGenericTunnelItem::DIRECTION_CLIENT ;
item->PeerId(tunnel.local_src) ;
}
else
std::cerr << "p3turtle::sendChunkMap: consistency error!" << std::endl ;
#ifdef P3TURTLE_DEBUG
std::cerr << "p3turtle: sending chunk map to peer " << peerId << ", hash=0x" << hash << ") through tunnel " << (void*)item->tunnel_id << ", next peer=" << item->PeerId() << std::endl ;
#endif
sendItem(item) ;
}
// This function is actually not needed: Search request to the turtle router are:
// - distant search requests, handled by the router
// - search requests over files being downloaded, handled by rsFiles !!
@ -1573,7 +1686,7 @@ static std::string printNumber(uint64_t num,bool hex=false)
if(hex)
{
char tmp[100] ;
sprintf(tmp,"0x%08x%08x", uint32_t(num >> 32),uint32_t(num & ( (1<<32)-1 ))) ;
sprintf(tmp,"%08x%08x", uint32_t(num >> 32),uint32_t(num & ( (1<<32)-1 ))) ;
return std::string(tmp) ;
}
else

View file

@ -280,6 +280,13 @@ class p3turtle: public p3Service, public pqiMonitor, public RsTurtle,/* public f
/// Send file data into the correct tunnel for the given file hash
void sendFileData(const std::string& peerId, const std::string& hash, uint64_t size, uint64_t baseoffset, uint32_t chunksize, void *data) ;
/// Send a request for the chunk map of this file to the given peer
void sendChunkMapRequest(const std::string& peerId, const std::string& hash) ;
/// Send a chunk map of this file to the given peer
void sendChunkMap(const std::string& peerId, const std::string& hash,const CompressedChunkMap& cmap) ;
private:
//--------------------------- Admin/Helper functions -------------------------//
@ -321,6 +328,7 @@ class p3turtle: public p3Service, public pqiMonitor, public RsTurtle,/* public f
void handleTunnelResult(RsTurtleTunnelOkItem *item);
void handleRecvFileRequest(RsTurtleFileRequestItem *item);
void handleRecvFileData(RsTurtleFileDataItem *item);
void handleRecvFileMapRequest(RsTurtleFileMapRequestItem*);
void handleRecvFileMap(RsTurtleFileMapItem*);
//------ Functions connecting the turtle router to other components.----------//

View file

@ -114,17 +114,27 @@ uint32_t RsTurtleFileDataItem::serial_size()
return s ;
}
uint32_t RsTurtleFileMapRequestItem::serial_size()
{
uint32_t s = 0 ;
s += 8 ; // header
s += 4 ; // tunnel id
s += 4 ; // direction
return s ;
}
uint32_t RsTurtleFileMapItem::serial_size()
{
uint32_t s = 0 ;
s += 8 ; // header
s += 4 ; // tunnel id
s += 4 ; // chunk_size
s += 4 ; // nb_chunks
s += 4 ; // direction
s += 4 ; // compressed_map.size()
s += 4 * compressed_map.size() ;
s += 4 * compressed_map._map.size() ;
return s ;
}
@ -162,6 +172,7 @@ RsItem *RsTurtleSerialiser::deserialise(void *data, uint32_t *size)
case RS_TURTLE_SUBTYPE_TUNNEL_OK : return new RsTurtleTunnelOkItem(data,*size) ;
case RS_TURTLE_SUBTYPE_FILE_REQUEST : return new RsTurtleFileRequestItem(data,*size) ;
case RS_TURTLE_SUBTYPE_FILE_DATA : return new RsTurtleFileDataItem(data,*size) ;
case RS_TURTLE_SUBTYPE_FILE_MAP_REQUEST : return new RsTurtleFileMapRequestItem(data,*size) ;
case RS_TURTLE_SUBTYPE_FILE_MAP : return new RsTurtleFileMapItem(data,*size) ;
default:
@ -179,6 +190,39 @@ RsItem *RsTurtleSerialiser::deserialise(void *data, uint32_t *size)
}
bool RsTurtleFileMapRequestItem::serialize(void *data,uint32_t& pktsize)
{
uint32_t tlvsize = serial_size();
uint32_t offset = 0;
if (pktsize < tlvsize)
return false; /* not enough space */
pktsize = tlvsize;
bool ok = true;
ok &= setRsItemHeader(data,tlvsize,PacketId(), tlvsize);
/* skip the header */
offset += 8;
/* add mandatory parts first */
ok &= setRawUInt32(data, tlvsize, &offset, tunnel_id);
ok &= setRawUInt32(data, tlvsize, &offset, direction);
if (offset != tlvsize)
{
ok = false;
#ifdef RSSERIAL_DEBUG
std::cerr << "RsFileConfigSerialiser::serialiseTransfer() Size Error! " << std::endl;
#endif
}
return ok;
}
bool RsTurtleFileMapItem::serialize(void *data,uint32_t& pktsize)
{
uint32_t tlvsize = serial_size();
@ -199,12 +243,11 @@ bool RsTurtleFileMapItem::serialize(void *data,uint32_t& pktsize)
/* add mandatory parts first */
ok &= setRawUInt32(data, tlvsize, &offset, tunnel_id);
ok &= setRawUInt32(data, tlvsize, &offset, chunk_size);
ok &= setRawUInt32(data, tlvsize, &offset, nb_chunks);
ok &= setRawUInt32(data, tlvsize, &offset, compressed_map.size());
ok &= setRawUInt32(data, tlvsize, &offset, direction);
ok &= setRawUInt32(data, tlvsize, &offset, compressed_map._map.size());
for(uint32_t i=0;i<compressed_map.size() && ok;++i)
ok &= setRawUInt32(data, tlvsize, &offset, compressed_map[i]);
for(uint32_t i=0;i<compressed_map._map.size() && ok;++i)
ok &= setRawUInt32(data, tlvsize, &offset, compressed_map._map[i]);
if (offset != tlvsize)
{
@ -409,24 +452,54 @@ RsTurtleFileMapItem::RsTurtleFileMapItem(void *data,uint32_t pktsize)
: RsTurtleGenericTunnelItem(RS_TURTLE_SUBTYPE_FILE_MAP)
{
#ifdef P3TURTLE_DEBUG
std::cerr << " type = search result" << std::endl ;
std::cerr << " type = file map item" << std::endl ;
#endif
uint32_t offset = 8; // skip the header
uint32_t rssize = getRsItemSize(data);
/* add mandatory parts first */
bool ok = true ;
uint32_t s ;
uint32_t s,d ;
ok &= getRawUInt32(data, pktsize, &offset, &tunnel_id);
ok &= getRawUInt32(data, pktsize, &offset, &chunk_size);
ok &= getRawUInt32(data, pktsize, &offset, &nb_chunks) ;
ok &= getRawUInt32(data, pktsize, &offset, &d);
direction = d ;
ok &= getRawUInt32(data, pktsize, &offset, &s) ;
compressed_map.resize(s) ;
compressed_map._map.resize(s) ;
for(uint32_t i=0;i<s && ok;++i)
ok &= getRawUInt32(data, pktsize, &offset, &(compressed_map[i])) ;
ok &= getRawUInt32(data, pktsize, &offset, &(compressed_map._map[i])) ;
#ifdef WINDOWS_SYS // No Exceptions in Windows compile. (drbobs).
#else
if (offset != pktsize)
throw std::runtime_error("Size error while deserializing.") ;
if (!ok)
throw std::runtime_error("Unknown error while deserializing.") ;
#endif
}
RsTurtleFileMapRequestItem::RsTurtleFileMapRequestItem(void *data,uint32_t pktsize)
: RsTurtleGenericTunnelItem(RS_TURTLE_SUBTYPE_FILE_MAP_REQUEST)
{
#ifdef P3TURTLE_DEBUG
std::cerr << " type = file map request item" << std::endl ;
#endif
uint32_t offset = 8; // skip the header
/* add mandatory parts first */
bool ok = true ;
ok &= getRawUInt32(data, pktsize, &offset, &tunnel_id);
ok &= getRawUInt32(data, pktsize, &offset, &direction);
#ifdef WINDOWS_SYS // No Exceptions in Windows compile. (drbobs).
#else
if (offset != pktsize)
throw std::runtime_error("Size error while deserializing.") ;
if (!ok)
throw std::runtime_error("Unknown error while deserializing.") ;
#endif
}
RsTurtleSearchResultItem::RsTurtleSearchResultItem(void *data,uint32_t pktsize)
@ -826,12 +899,21 @@ std::ostream& RsTurtleFileMapItem::print(std::ostream& o, uint16_t)
o << "File map item:" << std::endl ;
o << " tunnel id : " << (void*)tunnel_id << std::endl ;
o << " chunk size: " << chunk_size << std::endl ;
o << " nb chunks : " << nb_chunks << std::endl ;
o << " direction : " << direction << std::endl ;
o << " map : " ;
for(uint32_t i=0;i<compressed_map.size();++i)
o << (void*)compressed_map[i] << std::endl ;
for(uint32_t i=0;i<compressed_map._map.size();++i)
o << (void*)compressed_map._map[i] << std::endl ;
return o ;
}
std::ostream& RsTurtleFileMapRequestItem::print(std::ostream& o, uint16_t)
{
o << "File map request item:" << std::endl ;
o << " tunnel id : " << (void*)tunnel_id << std::endl ;
o << " direction : " << direction << std::endl ;
return o ;
}

View file

@ -5,6 +5,7 @@
#include "serialiser/rsbaseserial.h"
#include "rsiface/rsturtle.h"
#include "rsiface/rsexpr.h"
#include "rsiface/rstypes.h"
#include "serialiser/rsserviceids.h"
#include "turtle/turtletypes.h"
@ -18,6 +19,7 @@ const uint8_t RS_TURTLE_SUBTYPE_FILE_REQUEST = 0x07 ;
const uint8_t RS_TURTLE_SUBTYPE_FILE_DATA = 0x08 ;
const uint8_t RS_TURTLE_SUBTYPE_REGEXP_SEARCH_REQUEST = 0x09 ;
const uint8_t RS_TURTLE_SUBTYPE_FILE_MAP = 0x10 ;
const uint8_t RS_TURTLE_SUBTYPE_FILE_MAP_REQUEST = 0x11 ;
/***********************************************************************************/
/* Basic Turtle Item Class */
@ -153,7 +155,9 @@ class RsTurtleGenericTunnelItem: public RsTurtleItem
public:
RsTurtleGenericTunnelItem(uint8_t sub_packet_id) : RsTurtleItem(sub_packet_id) {}
typedef enum { DIRECTION_CLIENT, DIRECTION_SERVER } Direction ;
typedef uint32_t Direction ;
static const Direction DIRECTION_CLIENT = 0x001 ;
static const Direction DIRECTION_SERVER = 0x002 ;
/// Does this packet stamps tunnels when it passes through ?
/// This is used for keeping trace weither tunnels are active or not.
@ -218,6 +222,26 @@ class RsTurtleFileDataItem: public RsTurtleGenericTunnelItem
virtual uint32_t serial_size() ;
};
class RsTurtleFileMapRequestItem: public RsTurtleGenericTunnelItem
{
public:
RsTurtleFileMapRequestItem() : RsTurtleGenericTunnelItem(RS_TURTLE_SUBTYPE_FILE_MAP_REQUEST) {}
RsTurtleFileMapRequestItem(void *data,uint32_t size) ; // deserialization
virtual bool shouldStampTunnel() const { return false ; }
virtual TurtleTunnelId tunnelId() const { return tunnel_id ; }
virtual Direction travelingDirection() const { return direction ; }
Direction direction ; // travel direction for this packet (server/client)
uint32_t tunnel_id ; // id of the tunnel to travel through. Also used for identifying the file source
// this info from the file size, but this allows a security check.
virtual std::ostream& print(std::ostream& o, uint16_t) ;
virtual bool serialize(void *data,uint32_t& size) ;
virtual uint32_t serial_size() ;
};
class RsTurtleFileMapItem: public RsTurtleGenericTunnelItem
{
public:
@ -230,11 +254,9 @@ class RsTurtleFileMapItem: public RsTurtleGenericTunnelItem
Direction direction ; // travel direction for this packet (server/client)
uint32_t tunnel_id ; // id of the tunnel to travel through. Also used for identifying the file source
uint32_t chunk_size ; // fixed size of chunks, as seen from the source, for the given map.
uint32_t nb_chunks ; // number of chunks in the file. The last two infos are redundant, as we can recompute
// this info from the file size, but this allows a security check.
std::vector<uint32_t> compressed_map ; // Map info for the file in compressed format. Each *bit* in the array uint's says "I have" or "I don't have"
CompressedChunkMap compressed_map ; // Map info for the file in compressed format. Each *bit* in the array uint's says "I have" or "I don't have"
// by default, we suppose the peer has all the chunks. This info will thus be and-ed
// with the default file map for this source.