- implemented bidirectional exchange of chunk maps for direct downloads, to allow showing proper completion of transfers from direct friends.

- moved the direction flag upward in the pipeline (ftDataSend instead of p3turtle)




git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@3313 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
csoler 2010-07-25 19:04:31 +00:00
parent 569ac25843
commit cee8600a93
13 changed files with 483 additions and 71 deletions

View File

@ -54,10 +54,14 @@ class ftDataSend
/* Server Send */ /* Server Send */
virtual bool sendData(const std::string& peerId, const std::string& hash, uint64_t size, uint64_t offset, uint32_t chunksize, void *data) = 0; virtual bool sendData(const std::string& peerId, const std::string& hash, uint64_t size, uint64_t offset, uint32_t chunksize, void *data) = 0;
/// Send a request for a chunk map /// Send a chunkmap[request]. Because requests/chunkmaps can go both
virtual bool sendChunkMapRequest(const std::string& peer_id,const std::string& hash) = 0; //directions, but for different usages, we have this "is_client" flags,
/// Send a chunk map //that gives the ultimate goal of the data. "is_client==true" means that
virtual bool sendChunkMap(const std::string& peer_id,const std::string& hash,const CompressedChunkMap& cmap) = 0; //the message is for a client (download) instead of a server.
//
virtual bool sendChunkMapRequest(const std::string& peer_id,const std::string& hash,bool is_client) = 0;
virtual bool sendChunkMap(const std::string& peer_id,const std::string& hash,const CompressedChunkMap& cmap,bool is_client) = 0;
/// Send a request for a chunk crc map /// Send a request for a chunk crc map
virtual bool sendCRC32MapRequest(const std::string& peer_id,const std::string& hash) = 0; virtual bool sendCRC32MapRequest(const std::string& peer_id,const std::string& hash) = 0;
/// Send a chunk crc map /// Send a chunk crc map

View File

@ -528,7 +528,7 @@ bool ftDataMultiplex::handleRecvClientChunkMapRequest(const std::string& peerId,
(it->second).mCreator->getAvailabilityMap(cmap); (it->second).mCreator->getAvailabilityMap(cmap);
} }
mDataSend->sendChunkMap(peerId,hash,cmap); mDataSend->sendChunkMap(peerId,hash,cmap,false);
return true ; return true ;
} }
@ -574,7 +574,7 @@ bool ftDataMultiplex::handleRecvServerChunkMapRequest(const std::string& peerId,
it->second->getAvailabilityMap(cmap); it->second->getAvailabilityMap(cmap);
} }
mDataSend->sendChunkMap(peerId,hash,cmap); mDataSend->sendChunkMap(peerId,hash,cmap,true);
return true; return true;
} }
@ -719,14 +719,14 @@ bool ftDataMultiplex::getClientChunkMap(const std::string& upload_hash,const std
// If the map is too old then we should ask an other map to the peer. // If the map is too old then we should ask an other map to the peer.
// //
if(too_old) if(too_old)
sendChunkMapRequest(peerId,upload_hash); sendChunkMapRequest(peerId,upload_hash,true);
return true ; return true ;
} }
bool ftDataMultiplex::sendChunkMapRequest(const std::string& peer_id,const std::string& hash) bool ftDataMultiplex::sendChunkMapRequest(const std::string& peer_id,const std::string& hash,bool is_client)
{ {
return mDataSend->sendChunkMapRequest(peer_id,hash); return mDataSend->sendChunkMapRequest(peer_id,hash,is_client);
} }
bool ftDataMultiplex::sendCRCMapRequest(const std::string& peer_id,const std::string& hash,const CompressedChunkMap&) bool ftDataMultiplex::sendCRCMapRequest(const std::string& peer_id,const std::string& hash,const CompressedChunkMap&)
{ {

View File

@ -108,7 +108,7 @@ class ftDataMultiplex: public ftDataRecv, public RsQueueThread
bool sendData(const std::string& peerId, const std::string& hash, uint64_t size, uint64_t offset, uint32_t chunksize, void *data); bool sendData(const std::string& peerId, const std::string& hash, uint64_t size, uint64_t offset, uint32_t chunksize, void *data);
/* Server/client Send */ /* Server/client Send */
bool sendChunkMapRequest(const std::string& peerId, const std::string& hash) ; bool sendChunkMapRequest(const std::string& peerId, const std::string& hash,bool is_client) ;
/* Client Send */ /* Client Send */
bool sendCRCMapRequest(const std::string& peerId, const std::string& hash,const CompressedChunkMap& chnks) ; bool sendCRCMapRequest(const std::string& peerId, const std::string& hash,const CompressedChunkMap& chnks) ;

View File

@ -738,27 +738,50 @@ bool ftServer::sendDataRequest(const std::string& peerId, const std::string& has
return true; return true;
} }
bool ftServer::sendChunkMapRequest(const std::string& peerId,const std::string& hash) bool ftServer::sendChunkMapRequest(const std::string& peerId,const std::string& hash,bool is_client)
{ {
if(mTurtleRouter->isTurtlePeer(peerId)) if(mTurtleRouter->isTurtlePeer(peerId))
mTurtleRouter->sendChunkMapRequest(peerId,hash) ; mTurtleRouter->sendChunkMapRequest(peerId,hash,is_client) ;
else else
std::cerr << "ftServer: Warning: not sending chunk map request to peer " << peerId << ", because it's not a turtle tunnel." << std::endl ; {
/* create a packet */
/* push to networking part */
RsFileChunkMapRequest *rfi = new RsFileChunkMapRequest();
/* id */
rfi->PeerId(peerId);
/* file info */
rfi->hash = hash; /* ftr->hash; */
rfi->is_client = is_client ;
mP3iface->SendFileChunkMapRequest(rfi);
}
// We only send chunkmap requests to turtle peers. This will be a problem at display time for
// direct friends, so I'll see later whether I code it or not.
return true ; return true ;
} }
bool ftServer::sendChunkMap(const std::string& peerId,const std::string& hash,const CompressedChunkMap& map) bool ftServer::sendChunkMap(const std::string& peerId,const std::string& hash,const CompressedChunkMap& map,bool is_client)
{ {
if(mTurtleRouter->isTurtlePeer(peerId)) if(mTurtleRouter->isTurtlePeer(peerId))
mTurtleRouter->sendChunkMap(peerId,hash,map) ; mTurtleRouter->sendChunkMap(peerId,hash,map,is_client) ;
else else
std::cerr << "ftServer: Warning: not sending chunk map to peer " << peerId << ", because it's not a turtle tunnel." << std::endl ; {
/* create a packet */
/* push to networking part */
RsFileChunkMap *rfi = new RsFileChunkMap();
/* id */
rfi->PeerId(peerId);
/* file info */
rfi->hash = hash; /* ftr->hash; */
rfi->is_client = is_client; /* ftr->hash; */
rfi->compressed_map = map; /* ftr->hash; */
mP3iface->SendFileChunkMap(rfi);
}
// We only send chunkmap requests to turtle peers. This will be a problem at display time for
// direct friends, so I'll see later whether I code it or not.
return true ; return true ;
} }
bool ftServer::sendCRC32MapRequest(const std::string& peerId,const std::string& hash) bool ftServer::sendCRC32MapRequest(const std::string& peerId,const std::string& hash)
@ -1022,6 +1045,8 @@ bool ftServer::handleFileData()
// now File Input. // now File Input.
RsFileRequest *fr; RsFileRequest *fr;
RsFileData *fd; RsFileData *fd;
RsFileChunkMapRequest *fcmr;
RsFileChunkMap *fcm;
int i_init = 0; int i_init = 0;
int i = 0; int i = 0;
@ -1082,7 +1107,48 @@ FileInfo(ffr);
fd->fd.binData.TlvShallowClear(); fd->fd.binData.TlvShallowClear();
delete fd; delete fd;
} }
// now file chunkmap requests
i_init = i;
while((fcmr = mP3iface -> GetFileChunkMapRequest()) != NULL )
{
#ifdef SERVER_DEBUG
std::cerr << "ftServer::handleFileData() Recvd ChunkMap request" << std::endl;
std::ostringstream out;
if (i == i_init)
{
out << "Incoming(Net) File Data:" << std::endl;
}
fcmr -> print(out);
rslog(RSL_DEBUG_BASIC, ftserverzone, out.str());
#endif
i++; /* count */
/* incoming data */
mFtDataplex->recvChunkMapRequest(fcmr->PeerId(), fcmr->hash,fcmr->is_client) ;
delete fcmr;
}
// now file chunkmaps
i_init = i;
while((fcm = mP3iface -> GetFileChunkMap()) != NULL )
{
#ifdef SERVER_DEBUG
std::cerr << "ftServer::handleFileData() Recvd ChunkMap request" << std::endl;
std::ostringstream out;
if (i == i_init)
{
out << "Incoming(Net) File Data:" << std::endl;
}
fcm -> print(out);
rslog(RSL_DEBUG_BASIC, ftserverzone, out.str());
#endif
i++; /* count */
/* incoming data */
mFtDataplex->recvChunkMap(fcm->PeerId(), fcm->hash,fcm->compressed_map,fcm->is_client) ;
delete fcm;
}
if (i > 0) if (i > 0)
{ {
return 1; return 1;

View File

@ -210,8 +210,8 @@ virtual bool unshareDownloadDirectory();
public: public:
virtual bool sendData(const std::string& peerId, const std::string& hash, uint64_t size, uint64_t offset, uint32_t chunksize, void *data); virtual bool sendData(const std::string& peerId, const std::string& hash, uint64_t size, uint64_t offset, uint32_t chunksize, void *data);
virtual bool sendDataRequest(const std::string& peerId, const std::string& hash, uint64_t size, uint64_t offset, uint32_t chunksize); virtual bool sendDataRequest(const std::string& peerId, const std::string& hash, uint64_t size, uint64_t offset, uint32_t chunksize);
virtual bool sendChunkMapRequest(const std::string& peer_id,const std::string& hash) ; virtual bool sendChunkMapRequest(const std::string& peer_id,const std::string& hash,bool is_client) ;
virtual bool sendChunkMap(const std::string& peer_id,const std::string& hash,const CompressedChunkMap& cmap) ; virtual bool sendChunkMap(const std::string& peer_id,const std::string& hash,const CompressedChunkMap& cmap,bool is_client) ;
virtual bool sendCRC32MapRequest(const std::string&, const std::string&) ; virtual bool sendCRC32MapRequest(const std::string&, const std::string&) ;
virtual bool sendCRC32Map(const std::string&, const std::string&, const CRC32Map&) ; virtual bool sendCRC32Map(const std::string&, const std::string&, const CRC32Map&) ;

View File

@ -348,7 +348,7 @@ bool ftTransferModule::getChunk(const std::string& peer_id,uint32_t size_hint,ui
bool val = mFileCreator->getMissingChunk(peer_id,size_hint,offset, chunk_size,source_peer_map_needed); bool val = mFileCreator->getMissingChunk(peer_id,size_hint,offset, chunk_size,source_peer_map_needed);
if(source_peer_map_needed) if(source_peer_map_needed)
mMultiplexor->sendChunkMapRequest(peer_id, mHash) ; mMultiplexor->sendChunkMapRequest(peer_id, mHash,false) ;
#ifdef FT_DEBUG #ifdef FT_DEBUG
if (val) if (val)
@ -683,7 +683,6 @@ bool ftTransferModule::checkCRC()
// _crcmap_last_source_id = (_crcmap_last_source_id+1)%mFileSources.size() ; // _crcmap_last_source_id = (_crcmap_last_source_id+1)%mFileSources.size() ;
int n=0 ;
bool found = false ; bool found = false ;
std::map<std::string,peerInfo>::const_iterator mit ; std::map<std::string,peerInfo>::const_iterator mit ;
for(mit = mFileSources.begin();mit != mFileSources.end();++mit) for(mit = mFileSources.begin();mit != mFileSources.end();++mit)

View File

@ -66,6 +66,12 @@ virtual int SendFileRequest(RsFileRequest *) = 0;
virtual RsFileData *GetFileData() = 0; virtual RsFileData *GetFileData() = 0;
virtual int SendFileData(RsFileData *) = 0; virtual int SendFileData(RsFileData *) = 0;
virtual RsFileChunkMapRequest *GetFileChunkMapRequest() = 0;
virtual int SendFileChunkMapRequest(RsFileChunkMapRequest *) = 0;
virtual RsFileChunkMap *GetFileChunkMap() = 0;
virtual int SendFileChunkMap(RsFileChunkMap *) = 0;
}; };
class P3Interface: public SearchInterface class P3Interface: public SearchInterface

View File

@ -311,7 +311,14 @@ int pqihandler::SendFileData(RsFileData *ns)
{ {
return HandleRsItem(ns, 0); return HandleRsItem(ns, 0);
} }
int pqihandler::SendFileChunkMapRequest(RsFileChunkMapRequest *ns)
{
return HandleRsItem(ns, 0);
}
int pqihandler::SendFileChunkMap(RsFileChunkMap *ns)
{
return HandleRsItem(ns, 0);
}
int pqihandler::SendRsRawItem(RsRawItem *ns) int pqihandler::SendRsRawItem(RsRawItem *ns)
{ {
pqioutput(PQL_DEBUG_BASIC, pqihandlerzone, pqioutput(PQL_DEBUG_BASIC, pqihandlerzone,
@ -450,24 +457,35 @@ void pqihandler::locked_SortnStoreItem(RsItem *item)
case RS_PKT_TYPE_FILE: case RS_PKT_TYPE_FILE:
switch(subtype) switch(subtype)
{ {
case RS_PKT_SUBTYPE_FI_REQUEST: case RS_PKT_SUBTYPE_FI_REQUEST:
pqioutput(PQL_DEBUG_BASIC, pqihandlerzone, pqioutput(PQL_DEBUG_BASIC, pqihandlerzone,
"SortnStore -> File Request"); "SortnStore -> File Request");
in_request.push_back(item); in_request.push_back(item);
item = NULL; item = NULL;
break; break;
case RS_PKT_SUBTYPE_FI_DATA: case RS_PKT_SUBTYPE_FI_DATA:
pqioutput(PQL_DEBUG_BASIC, pqihandlerzone, pqioutput(PQL_DEBUG_BASIC, pqihandlerzone, "SortnStore -> File Data");
"SortnStore -> File Data"); in_data.push_back(item);
in_data.push_back(item); item = NULL;
item = NULL; break;
break;
default: case RS_PKT_SUBTYPE_FI_CHUNK_MAP_REQUEST:
break; /* no match! */ pqioutput(PQL_DEBUG_BASIC, pqihandlerzone, "SortnStore -> File ChunkMap Request");
} in_chunkmap_request.push_back(item);
item = NULL;
break;
case RS_PKT_SUBTYPE_FI_CHUNK_MAP:
pqioutput(PQL_DEBUG_BASIC, pqihandlerzone, "SortnStore -> File ChunkMap");
in_chunkmap.push_back(item);
item = NULL;
break;
default:
break; /* no match! */
}
break; break;
default: default:
@ -549,6 +567,34 @@ RsFileData *pqihandler::GetFileData()
} }
return NULL; return NULL;
} }
RsFileChunkMapRequest *pqihandler::GetFileChunkMapRequest()
{
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
if (in_chunkmap_request.size() != 0)
{
RsFileChunkMapRequest *fi = dynamic_cast<RsFileChunkMapRequest *>(in_chunkmap_request.front());
if (!fi) { delete in_chunkmap_request.front(); }
in_chunkmap_request.pop_front();
return fi;
}
return NULL;
}
RsFileChunkMap *pqihandler::GetFileChunkMap()
{
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
if (in_chunkmap.size() != 0)
{
RsFileChunkMap *fi = dynamic_cast<RsFileChunkMap *>(in_chunkmap.front());
if (!fi) { delete in_chunkmap.front(); }
in_chunkmap.pop_front();
return fi;
}
return NULL;
}
RsRawItem *pqihandler::GetRsRawItem() RsRawItem *pqihandler::GetRsRawItem()
{ {

View File

@ -63,8 +63,12 @@ virtual RsCacheItem * GetSearchResult();
// file i/o // file i/o
virtual int SendFileRequest(RsFileRequest *ns); virtual int SendFileRequest(RsFileRequest *ns);
virtual int SendFileData(RsFileData *ns); virtual int SendFileData(RsFileData *ns);
virtual RsFileRequest * GetFileRequest(); virtual int SendFileChunkMapRequest(RsFileChunkMapRequest *ns);
virtual RsFileData * GetFileData(); virtual int SendFileChunkMap(RsFileChunkMap *ns);
virtual RsFileRequest *GetFileRequest();
virtual RsFileData *GetFileData();
virtual RsFileChunkMapRequest *GetFileChunkMapRequest();
virtual RsFileChunkMap *GetFileChunkMap();
// Rest of P3Interface // Rest of P3Interface
virtual int tick(); virtual int tick();
@ -101,8 +105,7 @@ void locked_SortnStoreItem(RsItem *item);
SecurityPolicy *globsec; SecurityPolicy *globsec;
// Temporary storage... // Temporary storage...
std::list<RsItem *> in_result, in_search, std::list<RsItem *> in_result, in_search, in_request, in_data, in_service,in_chunkmap,in_chunkmap_request;
in_request, in_data, in_service;
private: private:

View File

@ -26,6 +26,7 @@
#include "serialiser/rsbaseserial.h" #include "serialiser/rsbaseserial.h"
#include "serialiser/rsbaseitems.h" #include "serialiser/rsbaseitems.h"
#include "serialiser/rstlvbase.h"
/*** /***
#define RSSERIAL_DEBUG 1 #define RSSERIAL_DEBUG 1
@ -39,6 +40,8 @@ uint32_t RsFileItemSerialiser::size(RsItem *i)
{ {
RsFileRequest *rfr; RsFileRequest *rfr;
RsFileData *rfd; RsFileData *rfd;
RsFileChunkMapRequest *rfcmr;
RsFileChunkMap *rfcm;
if (NULL != (rfr = dynamic_cast<RsFileRequest *>(i))) if (NULL != (rfr = dynamic_cast<RsFileRequest *>(i)))
{ {
@ -48,6 +51,14 @@ uint32_t RsFileItemSerialiser::size(RsItem *i)
{ {
return sizeData(rfd); return sizeData(rfd);
} }
else if (NULL != (rfcmr = dynamic_cast<RsFileChunkMapRequest *>(i)))
{
return sizeChunkMapReq(rfcmr);
}
else if (NULL != (rfcm = dynamic_cast<RsFileChunkMap *>(i)))
{
return sizeChunkMap(rfcm);
}
return 0; return 0;
} }
@ -57,6 +68,8 @@ bool RsFileItemSerialiser::serialise(RsItem *i, void *data, uint32_t *pktsize
{ {
RsFileRequest *rfr; RsFileRequest *rfr;
RsFileData *rfd; RsFileData *rfd;
RsFileChunkMapRequest *rfcmr;
RsFileChunkMap *rfcm;
if (NULL != (rfr = dynamic_cast<RsFileRequest *>(i))) if (NULL != (rfr = dynamic_cast<RsFileRequest *>(i)))
{ {
@ -66,7 +79,14 @@ bool RsFileItemSerialiser::serialise(RsItem *i, void *data, uint32_t *pktsize
{ {
return serialiseData(rfd, data, pktsize); return serialiseData(rfd, data, pktsize);
} }
else if (NULL != (rfcmr = dynamic_cast<RsFileChunkMapRequest *>(i)))
{
return serialiseChunkMapReq(rfcmr,data,pktsize);
}
else if (NULL != (rfcm = dynamic_cast<RsFileChunkMap *>(i)))
{
return serialiseChunkMap(rfcm,data,pktsize);
}
return false; return false;
} }
@ -90,6 +110,12 @@ RsItem *RsFileItemSerialiser::deserialise(void *data, uint32_t *pktsize)
case RS_PKT_SUBTYPE_FI_DATA: case RS_PKT_SUBTYPE_FI_DATA:
return deserialiseData(data, pktsize); return deserialiseData(data, pktsize);
break; break;
case RS_PKT_SUBTYPE_FI_CHUNK_MAP_REQUEST:
return deserialiseChunkMapReq(data, pktsize);
break;
case RS_PKT_SUBTYPE_FI_CHUNK_MAP:
return deserialiseChunkMap(data, pktsize);
break;
default: default:
return NULL; return NULL;
break; break;
@ -238,7 +264,25 @@ void RsFileData::clear()
{ {
fd.TlvClear(); fd.TlvClear();
} }
std::ostream &RsFileChunkMap::print(std::ostream &out, uint16_t indent)
{
printRsItemBase(out, "RsFileChunkMap", indent);
uint16_t int_Indent = indent + 2;
printIndent(out, int_Indent); out << "PeerId: " << PeerId() << std::endl ;
printIndent(out, int_Indent); out << " hash: " << hash << std::endl ;
printIndent(out, int_Indent); out << "chunks: " << (void*)(compressed_map._map[0]) << "..." << std::endl ;
printRsItemEnd(out, "RsFileChunkMap", indent);
return out;
}
std::ostream &RsFileChunkMapRequest::print(std::ostream &out, uint16_t indent)
{
printRsItemBase(out, "RsFileChunkMapRequest", indent);
uint16_t int_Indent = indent + 2;
printIndent(out, int_Indent); out << "PeerId: " << PeerId() << std::endl ;
printIndent(out, int_Indent); out << " hash: " << hash << std::endl ;
printRsItemEnd(out, "RsFileChunkMapRequest", indent);
return out;
}
std::ostream &RsFileData::print(std::ostream &out, uint16_t indent) std::ostream &RsFileData::print(std::ostream &out, uint16_t indent)
{ {
printRsItemBase(out, "RsFileData", indent); printRsItemBase(out, "RsFileData", indent);
@ -344,7 +388,207 @@ RsFileData *RsFileItemSerialiser::deserialiseData(void *data, uint32_t *pktsize)
return item; return item;
} }
uint32_t RsFileItemSerialiser::sizeChunkMapReq(RsFileChunkMapRequest *item)
{
uint32_t s = 8; /* header */
s += 1 ; // is_client
s += GetTlvStringSize(item->hash) ; // hash
return s;
}
uint32_t RsFileItemSerialiser::sizeChunkMap(RsFileChunkMap *item)
{
uint32_t s = 8; /* header */
s += 1 ; // is_client
s += GetTlvStringSize(item->hash) ; // hash
s += 4 ; // compressed map size
s += 4 * item->compressed_map._map.size() ; // compressed chunk map
return s;
}
/* serialise the data to the buffer */
bool RsFileItemSerialiser::serialiseChunkMapReq(RsFileChunkMapRequest *item, void *data, uint32_t *pktsize)
{
uint32_t tlvsize = sizeChunkMapReq(item);
uint32_t offset = 0;
if (*pktsize < tlvsize)
return false; /* not enough space */
*pktsize = tlvsize;
bool ok = true;
ok &= setRsItemHeader(data, tlvsize, item->PacketId(), tlvsize);
#ifdef RSSERIAL_DEBUG
std::cerr << "RsFileItemSerialiser::serialiseData() Header: " << ok << std::endl;
#endif
/* skip the header */
offset += 8;
/* add mandatory parts first */
ok &= setRawUInt8(data, tlvsize, &offset, item->is_client);
ok &= SetTlvString(data, tlvsize, &offset, TLV_TYPE_STR_VALUE, item->hash);
if (offset != tlvsize)
{
ok = false;
#ifdef RSSERIAL_DEBUG
std::cerr << "RsFileItemSerialiser::serialiseData() Size Error! " << std::endl;
#endif
}
return ok;
}
bool RsFileItemSerialiser::serialiseChunkMap(RsFileChunkMap *item, void *data, uint32_t *pktsize)
{
uint32_t tlvsize = sizeChunkMap(item);
uint32_t offset = 0;
if (*pktsize < tlvsize)
return false; /* not enough space */
*pktsize = tlvsize;
bool ok = true;
ok &= setRsItemHeader(data, tlvsize, item->PacketId(), tlvsize);
#ifdef RSSERIAL_DEBUG
std::cerr << "RsFileItemSerialiser::serialiseData() Header: " << ok << std::endl;
#endif
/* skip the header */
offset += 8;
/* add mandatory parts first */
ok &= setRawUInt8(data, tlvsize, &offset, item->is_client);
ok &= SetTlvString(data, tlvsize, &offset, TLV_TYPE_STR_VALUE, item->hash);
ok &= setRawUInt32(data, tlvsize, &offset, item->compressed_map._map.size());
for(uint32_t i=0;i<item->compressed_map._map.size();++i)
ok &= setRawUInt32(data, tlvsize, &offset, item->compressed_map._map[i]);
if (offset != tlvsize)
{
ok = false;
#ifdef RSSERIAL_DEBUG
std::cerr << "RsFileItemSerialiser::serialiseData() Size Error! " << std::endl;
#endif
}
return ok;
}
RsFileChunkMapRequest *RsFileItemSerialiser::deserialiseChunkMapReq(void *data, uint32_t *pktsize)
{
/* get the type and size */
uint32_t rstype = getRsItemId(data);
uint32_t rssize = getRsItemSize(data);
uint32_t offset = 0;
if ((RS_PKT_VERSION1 != getRsItemVersion(rstype)) ||
(RS_PKT_CLASS_BASE != getRsItemClass(rstype)) ||
(RS_PKT_TYPE_FILE != getRsItemType(rstype)) ||
(RS_PKT_SUBTYPE_FI_CHUNK_MAP_REQUEST != getRsItemSubType(rstype)))
{
return NULL; /* wrong type */
}
if (*pktsize < rssize) /* check size */
return NULL; /* not enough data */
/* set the packet length */
*pktsize = rssize;
bool ok = true;
/* ready to load */
RsFileChunkMapRequest *item = new RsFileChunkMapRequest();
item->clear();
/* skip the header */
offset += 8;
uint8_t tmp ;
ok &= getRawUInt8(data, *pktsize, &offset, &tmp); item->is_client = tmp;
ok &= GetTlvString(data, *pktsize, &offset, TLV_TYPE_STR_VALUE, item->hash); // file hash
if (offset != rssize)
{
/* error */
delete item;
return NULL;
}
if (!ok)
{
delete item;
return NULL;
}
return item;
}
RsFileChunkMap *RsFileItemSerialiser::deserialiseChunkMap(void *data, uint32_t *pktsize)
{
/* get the type and size */
uint32_t rstype = getRsItemId(data);
uint32_t rssize = getRsItemSize(data);
uint32_t offset = 0;
if ((RS_PKT_VERSION1 != getRsItemVersion(rstype)) ||
(RS_PKT_CLASS_BASE != getRsItemClass(rstype)) ||
(RS_PKT_TYPE_FILE != getRsItemType(rstype)) ||
(RS_PKT_SUBTYPE_FI_CHUNK_MAP != getRsItemSubType(rstype)))
{
return NULL; /* wrong type */
}
if (*pktsize < rssize) /* check size */
return NULL; /* not enough data */
/* set the packet length */
*pktsize = rssize;
bool ok = true;
/* ready to load */
RsFileChunkMap *item = new RsFileChunkMap();
item->clear();
/* skip the header */
offset += 8;
uint8_t tmp ;
ok &= getRawUInt8(data, *pktsize, &offset, &tmp); item->is_client = tmp;
ok &= GetTlvString(data, *pktsize, &offset, TLV_TYPE_STR_VALUE, item->hash); // file hash
uint32_t size =0;
ok &= getRawUInt32(data, *pktsize, &offset, &size);
if(ok)
{
item->compressed_map._map.resize(size) ;
for(uint32_t i=0;i<size && ok;++i)
ok &= getRawUInt32(data, *pktsize, &offset, &(item->compressed_map._map[i]));
}
if (offset != rssize)
{
/* error */
delete item;
return NULL;
}
if (!ok)
{
delete item;
return NULL;
}
return item;
}
/*************************************************************************/ /*************************************************************************/
/*************************************************************************/ /*************************************************************************/
@ -696,7 +940,6 @@ bool RsServiceSerialiser::serialise(RsItem *i, void *data, uint32_t *pktsize)
#ifdef RSSERIAL_DEBUG #ifdef RSSERIAL_DEBUG
std::cerr << "tlvsize : " << tlvsize << std::endl; std::cerr << "tlvsize : " << tlvsize << std::endl;
#endif #endif
uint32_t offset = 0;
if (*pktsize < tlvsize) if (*pktsize < tlvsize)
return false; /* not enough space */ return false; /* not enough space */

View File

@ -28,15 +28,20 @@
#include <map> #include <map>
#include <rsiface/rstypes.h>
#include "serialiser/rsserial.h" #include "serialiser/rsserial.h"
#include "serialiser/rstlvtypes.h" #include "serialiser/rstlvtypes.h"
const uint8_t RS_PKT_TYPE_FILE = 0x01; const uint8_t RS_PKT_TYPE_FILE = 0x01;
const uint8_t RS_PKT_TYPE_CACHE = 0x02; const uint8_t RS_PKT_TYPE_CACHE = 0x02;
const uint8_t RS_PKT_SUBTYPE_FI_REQUEST = 0x01; const uint8_t RS_PKT_SUBTYPE_FI_REQUEST = 0x01;
const uint8_t RS_PKT_SUBTYPE_FI_DATA = 0x02; const uint8_t RS_PKT_SUBTYPE_FI_DATA = 0x02;
const uint8_t RS_PKT_SUBTYPE_FI_TRANSFER = 0x03; const uint8_t RS_PKT_SUBTYPE_FI_TRANSFER = 0x03;
const uint8_t RS_PKT_SUBTYPE_FI_CHUNK_MAP_REQUEST = 0x04;
const uint8_t RS_PKT_SUBTYPE_FI_CHUNK_MAP = 0x05;
const uint8_t RS_PKT_SUBTYPE_FI_CRC32_MAP_REQUEST = 0x06;
const uint8_t RS_PKT_SUBTYPE_FI_CRC32_MAP = 0x07;
const uint8_t RS_PKT_SUBTYPE_CACHE_ITEM = 0x01; const uint8_t RS_PKT_SUBTYPE_CACHE_ITEM = 0x01;
const uint8_t RS_PKT_SUBTYPE_CACHE_REQUEST = 0x02; const uint8_t RS_PKT_SUBTYPE_CACHE_REQUEST = 0x02;
@ -77,32 +82,68 @@ std::ostream &print(std::ostream &out, uint16_t indent = 0);
RsTlvFileData fd; RsTlvFileData fd;
}; };
class RsFileChunkMapRequest: public RsItem
{
public:
RsFileChunkMapRequest()
:RsItem(RS_PKT_VERSION1, RS_PKT_CLASS_BASE, RS_PKT_TYPE_FILE, RS_PKT_SUBTYPE_FI_CHUNK_MAP_REQUEST)
{}
virtual ~RsFileChunkMapRequest() {}
virtual void clear() {}
bool is_client ; // is the request for a client, or a server ?
std::string hash ; // hash of the file for which we request the chunk map
std::ostream &print(std::ostream &out, uint16_t indent = 0);
};
class RsFileChunkMap: public RsItem
{
public:
RsFileChunkMap()
:RsItem(RS_PKT_VERSION1, RS_PKT_CLASS_BASE, RS_PKT_TYPE_FILE, RS_PKT_SUBTYPE_FI_CHUNK_MAP)
{}
virtual ~RsFileChunkMap() {}
virtual void clear() {}
bool is_client ; // is the request for a client, or a server ?
std::string hash ; // hash of the file for which we request the chunk map
CompressedChunkMap compressed_map ; // Chunk map of the file.
std::ostream &print(std::ostream &out, uint16_t indent = 0);
};
/**************************************************************************/ /**************************************************************************/
class RsFileItemSerialiser: public RsSerialType class RsFileItemSerialiser: public RsSerialType
{ {
public: public:
RsFileItemSerialiser() RsFileItemSerialiser()
:RsSerialType(RS_PKT_VERSION1, RS_PKT_CLASS_BASE, :RsSerialType(RS_PKT_VERSION1, RS_PKT_CLASS_BASE,
RS_PKT_TYPE_FILE) RS_PKT_TYPE_FILE)
{ return; } { return; }
virtual ~RsFileItemSerialiser() { return; } virtual ~RsFileItemSerialiser() { return; }
virtual uint32_t size(RsItem *); virtual uint32_t size(RsItem *);
virtual bool serialise (RsItem *item, void *data, uint32_t *size); virtual bool serialise (RsItem *item, void *data, uint32_t *size);
virtual RsItem * deserialise(void *data, uint32_t *size); virtual RsItem * deserialise(void *data, uint32_t *size);
private: private:
/* sub types */ /* sub types */
virtual uint32_t sizeReq(RsFileRequest *); virtual uint32_t sizeReq(RsFileRequest *);
virtual bool serialiseReq (RsFileRequest *item, void *data, uint32_t *size); virtual uint32_t sizeData(RsFileData *);
virtual RsFileRequest * deserialiseReq(void *data, uint32_t *size); virtual uint32_t sizeChunkMapReq(RsFileChunkMapRequest *);
virtual uint32_t sizeChunkMap(RsFileChunkMap *);
virtual uint32_t sizeData(RsFileData *); virtual bool serialiseReq (RsFileRequest *item, void *data, uint32_t *size);
virtual bool serialiseData (RsFileData *item, void *data, uint32_t *size); virtual bool serialiseData (RsFileData *item, void *data, uint32_t *size);
virtual RsFileData * deserialiseData(void *data, uint32_t *size); virtual bool serialiseChunkMapReq(RsFileChunkMapRequest *item, void *data, uint32_t *size);
virtual bool serialiseChunkMap(RsFileChunkMap *item, void *data, uint32_t *size);
virtual RsFileRequest *deserialiseReq(void *data, uint32_t *size);
virtual RsFileChunkMapRequest *deserialiseChunkMapReq(void *data, uint32_t *size);
virtual RsFileChunkMap *deserialiseChunkMap(void *data, uint32_t *size);
virtual RsFileData *deserialiseData(void *data, uint32_t *size);
}; };
/**************************************************************************/ /**************************************************************************/

View File

@ -1202,7 +1202,7 @@ void p3turtle::sendFileData(const std::string& peerId, const std::string& hash,
sendItem(item) ; sendItem(item) ;
} }
void p3turtle::sendChunkMapRequest(const std::string& peerId,const std::string& hash) void p3turtle::sendChunkMapRequest(const std::string& peerId,const std::string& hash,bool is_client)
{ {
RsStackMutex stack(mTurtleMtx); /********** STACK LOCKED MTX ******/ RsStackMutex stack(mTurtleMtx); /********** STACK LOCKED MTX ******/
@ -1229,16 +1229,18 @@ void p3turtle::sendChunkMapRequest(const std::string& peerId,const std::string&
if(tunnel.local_src == ownid) if(tunnel.local_src == ownid)
{ {
assert(!is_client) ;
item->direction = RsTurtleGenericTunnelItem::DIRECTION_SERVER ; item->direction = RsTurtleGenericTunnelItem::DIRECTION_SERVER ;
item->PeerId(tunnel.local_dst) ; item->PeerId(tunnel.local_dst) ;
} }
else if(tunnel.local_dst == ownid) else if(tunnel.local_dst == ownid)
{ {
assert(is_client) ;
item->direction = RsTurtleGenericTunnelItem::DIRECTION_CLIENT ; item->direction = RsTurtleGenericTunnelItem::DIRECTION_CLIENT ;
item->PeerId(tunnel.local_src) ; item->PeerId(tunnel.local_src) ;
} }
else else
std::cerr << "p3turtle::sendChunkMap: consistency error!" << std::endl ; std::cerr << "p3turtle::sendChunkMapRequest: consistency error!" << std::endl ;
#ifdef P3TURTLE_DEBUG #ifdef P3TURTLE_DEBUG
std::cerr << "p3turtle: sending chunk map req to peer " << peerId << ", hash=0x" << hash << ") through tunnel " << (void*)item->tunnel_id << ", next peer=" << item->PeerId() << std::endl ; std::cerr << "p3turtle: sending chunk map req to peer " << peerId << ", hash=0x" << hash << ") through tunnel " << (void*)item->tunnel_id << ", next peer=" << item->PeerId() << std::endl ;
@ -1246,7 +1248,7 @@ void p3turtle::sendChunkMapRequest(const std::string& peerId,const std::string&
sendItem(item) ; sendItem(item) ;
} }
void p3turtle::sendChunkMap(const std::string& peerId,const std::string& hash,const CompressedChunkMap& cmap) void p3turtle::sendChunkMap(const std::string& peerId,const std::string& hash,const CompressedChunkMap& cmap,bool is_client)
{ {
RsStackMutex stack(mTurtleMtx); /********** STACK LOCKED MTX ******/ RsStackMutex stack(mTurtleMtx); /********** STACK LOCKED MTX ******/
@ -1274,11 +1276,13 @@ void p3turtle::sendChunkMap(const std::string& peerId,const std::string& hash,co
if(tunnel.local_src == ownid) if(tunnel.local_src == ownid)
{ {
assert(!is_client) ;
item->direction = RsTurtleGenericTunnelItem::DIRECTION_SERVER ; item->direction = RsTurtleGenericTunnelItem::DIRECTION_SERVER ;
item->PeerId(tunnel.local_dst) ; item->PeerId(tunnel.local_dst) ;
} }
else if(tunnel.local_dst == ownid) else if(tunnel.local_dst == ownid)
{ {
assert(is_client) ;
item->direction = RsTurtleGenericTunnelItem::DIRECTION_CLIENT ; item->direction = RsTurtleGenericTunnelItem::DIRECTION_CLIENT ;
item->PeerId(tunnel.local_src) ; item->PeerId(tunnel.local_src) ;
} }

View File

@ -284,10 +284,10 @@ class p3turtle: public p3Service, public pqiMonitor, public RsTurtle,/* public f
void sendFileData(const std::string& peerId, const std::string& hash, uint64_t size, uint64_t baseoffset, uint32_t chunksize, void *data) ; void sendFileData(const std::string& peerId, const std::string& hash, uint64_t size, uint64_t baseoffset, uint32_t chunksize, void *data) ;
/// Send a request for the chunk map of this file to the given peer /// Send a request for the chunk map of this file to the given peer
void sendChunkMapRequest(const std::string& peerId, const std::string& hash) ; void sendChunkMapRequest(const std::string& peerId, const std::string& hash,bool is_client) ;
/// Send a chunk map of this file to the given peer /// Send a chunk map of this file to the given peer
void sendChunkMap(const std::string& peerId, const std::string& hash,const CompressedChunkMap& cmap) ; void sendChunkMap(const std::string& peerId, const std::string& hash,const CompressedChunkMap& cmap,bool is_client) ;
/// Send a request for the crc32 map of this file to the given peer /// Send a request for the crc32 map of this file to the given peer
void sendCRC32MapRequest(const std::string& peerId, const std::string& hash) ; void sendCRC32MapRequest(const std::string& peerId, const std::string& hash) ;