mirror of
https://github.com/RetroShare/RetroShare.git
synced 2025-06-06 21:58:57 -04:00
Added deferred check of chunks during file transfer. Chunk sha1 sums are requested to the sources and checked for downloaded data.
Validated chunks are shared to other peers. Force check is now very simple since it just turns all chunks into "needs checking" mode and sums are asked to sources. Sources maintain a temporary cache of chunks. Since sums are requested sparsely, this should not affect the sources in terms of performance. We can still imagine precomputing and saving sha1 of chunks while hashing them. For backward compatibility reasons, the following has been setup *temporarily* in this version: - unvalidated chunks are still considered as already obtained, and are shared and saved - force check has been disabled - final file check is maintained - in case of file fail, the old checking mode will be used. All changes for next version are kept in the define 'USE_NEW_CHUNK_CHECKING_CODE' that will be made the default in a few weeks. At start, I expect most chunk to stya yellow during download, until most sources are able to provide chunk hashs. git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@5019 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
parent
7ab5b54266
commit
889a2b2433
31 changed files with 1540 additions and 35 deletions
|
@ -42,6 +42,9 @@ const uint32_t DMULTIPLEX_MIN = 10; /* 1ms sleep */
|
|||
const uint32_t DMULTIPLEX_MAX = 1000; /* 1 sec sleep */
|
||||
const double DMULTIPLEX_RELAX = 0.5; /* ??? */
|
||||
|
||||
static const uint32_t MAX_CHECKING_CHUNK_WAIT_DELAY = 120 ; //! TTL for an inactive chunk
|
||||
const uint32_t MAX_SIMULTANEOUS_CRC_REQUESTS = 20 ;
|
||||
|
||||
/******
|
||||
* #define MPLEX_DEBUG 1
|
||||
*****/
|
||||
|
@ -57,6 +60,7 @@ const uint32_t FT_DATA_REQ = 0x0002; // data request to be treated
|
|||
const uint32_t FT_CLIENT_CHUNK_MAP_REQ = 0x0003; // chunk map request to be treated by client
|
||||
const uint32_t FT_SERVER_CHUNK_MAP_REQ = 0x0004; // chunk map request to be treated by server
|
||||
const uint32_t FT_CRC32MAP_REQ = 0x0005; // crc32 map request to be treated by server
|
||||
const uint32_t FT_CLIENT_CHUNK_CRC_REQ = 0x0006; // chunk sha1 crc request to be treated
|
||||
|
||||
ftRequest::ftRequest(uint32_t type, std::string peerId, std::string hash, uint64_t size, uint64_t offset, uint32_t chunk, void *data)
|
||||
:mType(type), mPeerId(peerId), mHash(hash), mSize(size),
|
||||
|
@ -281,7 +285,19 @@ bool ftDataMultiplex::recvCRC32MapRequest(const std::string& peerId, const std::
|
|||
|
||||
return true;
|
||||
}
|
||||
bool ftDataMultiplex::recvSingleChunkCrcRequest(const std::string& peerId, const std::string& hash,uint32_t chunk_number)
|
||||
{
|
||||
#ifdef MPLEX_DEBUG
|
||||
std::cerr << "ftDataMultiplex::recvChunkMapRequest() Server Recv";
|
||||
std::cerr << std::endl;
|
||||
#endif
|
||||
/* Store in Queue */
|
||||
RsStackMutex stack(dataMtx); /******* LOCK MUTEX ******/
|
||||
|
||||
mRequestQueue.push_back(ftRequest(FT_CLIENT_CHUNK_CRC_REQ,peerId,hash,0,0,chunk_number,NULL));
|
||||
|
||||
return true;
|
||||
}
|
||||
class CRC32Thread: public RsThread
|
||||
{
|
||||
public:
|
||||
|
@ -387,6 +403,14 @@ bool ftDataMultiplex::doWork()
|
|||
handleRecvCRC32MapRequest(req.mPeerId,req.mHash) ;
|
||||
break ;
|
||||
|
||||
case FT_CLIENT_CHUNK_CRC_REQ:
|
||||
#ifdef MPLEX_DEBUG
|
||||
std::cerr << "ftDataMultiplex::doWork() Handling FT_CLIENT_CHUNK_CRC_REQ";
|
||||
std::cerr << std::endl;
|
||||
#endif
|
||||
handleRecvChunkCrcRequest(req.mPeerId,req.mHash,req.mChunk) ;
|
||||
break ;
|
||||
|
||||
default:
|
||||
#ifdef MPLEX_DEBUG
|
||||
std::cerr << "ftDataMultiplex::doWork() Ignoring UNKNOWN";
|
||||
|
@ -465,6 +489,93 @@ bool ftDataMultiplex::doWork()
|
|||
return true;
|
||||
}
|
||||
|
||||
bool ftDataMultiplex::recvSingleChunkCrc(const std::string& /*peerId*/, const std::string& hash,uint32_t chunk_number,const Sha1CheckSum& crc)
|
||||
{
|
||||
RsStackMutex stack(dataMtx); /******* LOCK MUTEX ******/
|
||||
|
||||
//#ifdef MPLEX_DEBUG
|
||||
std::cerr << "ftDataMultiplex::recvSingleChunkCrc() Received crc of file " << hash << ", chunk " << chunk_number << ", crc=" << crc.toStdString() << std::endl;
|
||||
//#endif
|
||||
|
||||
std::map<std::string, ftClient>::iterator it = mClients.find(hash);
|
||||
|
||||
if(it == mClients.end())
|
||||
{
|
||||
std::cerr << "ftDataMultiplex::recvSingleChunkCrc() ERROR: No matching Client for CRC. This is an error. " << hash << " !" << std::endl;
|
||||
/* error */
|
||||
return false;
|
||||
}
|
||||
|
||||
// store in the cache as well
|
||||
|
||||
Sha1CacheEntry& sha1cache(_cached_sha1maps[hash]) ;
|
||||
|
||||
if(sha1cache._map.size() == 0)
|
||||
sha1cache._map = Sha1Map(it->second.mCreator->fileSize(),ChunkMap::CHUNKMAP_FIXED_CHUNK_SIZE) ;
|
||||
|
||||
sha1cache._map.set(chunk_number,crc) ;
|
||||
|
||||
// remove this chunk from the request list as well.
|
||||
|
||||
std::map<uint32_t,ChunkCheckSumSourceList>::iterator it2(sha1cache._to_ask.find(chunk_number)) ;
|
||||
|
||||
if(it2 != sha1cache._to_ask.end())
|
||||
sha1cache._to_ask.erase(it2) ;
|
||||
|
||||
sha1cache._received.push_back(chunk_number) ;
|
||||
|
||||
//#ifdef MPLEX_DEBUG
|
||||
std::cerr << "ftDataMultiplex::recvSingleChunkCrc() stored in cache. " << std::endl;
|
||||
//#endif
|
||||
|
||||
return true ;
|
||||
}
|
||||
|
||||
bool ftDataMultiplex::dispatchReceivedChunkCheckSum()
|
||||
{
|
||||
RsStackMutex stack(dataMtx); /******* LOCK MUTEX ******/
|
||||
|
||||
uint32_t MAX_CHECKSUM_CHECK_PER_FILE = 25 ;
|
||||
|
||||
for(std::map<std::string,Sha1CacheEntry>::iterator it(_cached_sha1maps.begin());it!=_cached_sha1maps.end();++it)
|
||||
{
|
||||
ftFileCreator *client = NULL ;
|
||||
|
||||
for(uint32_t n=0;n<MAX_CHECKSUM_CHECK_PER_FILE && n < it->second._received.size();)
|
||||
{
|
||||
if(client == NULL)
|
||||
{
|
||||
std::map<std::string, ftClient>::iterator itc = mClients.find(it->first);
|
||||
|
||||
std::cerr << "ftDataMultiplex::dispatchReceivedChunkCheckSum(): treating hash " << it->first << std::endl;
|
||||
|
||||
if(itc == mClients.end())
|
||||
{
|
||||
std::cerr << "ftDataMultiplex::dispatchReceivedChunkCheckSum() ERROR: No matching Client for hash. This is an error. Hash=" << it->first << std::endl;
|
||||
/* error */
|
||||
break ;
|
||||
}
|
||||
else
|
||||
client = itc->second.mCreator ;
|
||||
}
|
||||
int chunk_number = it->second._received[n] ;
|
||||
|
||||
if(!it->second._map.isSet(chunk_number))
|
||||
{
|
||||
std::cerr << "ftDataMultiplex::dispatchReceivedChunkCheckSum() ERROR: chunk " << chunk_number << " is supposed to be initialized but it was not received !!" << std::endl;
|
||||
++n ;
|
||||
continue ;
|
||||
}
|
||||
std::cerr << "ftDataMultiplex::dispatchReceivedChunkCheckSum(): checking chunk " << chunk_number << " with hash " << it->second._map[chunk_number].toStdString() << std::endl;
|
||||
client->verifyChunk(chunk_number,it->second._map[chunk_number]) ;
|
||||
|
||||
it->second._received[n] = it->second._received.back() ;
|
||||
it->second._received.pop_back() ;
|
||||
}
|
||||
}
|
||||
return true ;
|
||||
}
|
||||
|
||||
bool ftDataMultiplex::recvCRC32Map(const std::string& /*peerId*/, const std::string& hash,const CRC32Map& crc_map)
|
||||
{
|
||||
RsStackMutex stack(dataMtx); /******* LOCK MUTEX ******/
|
||||
|
@ -708,6 +819,121 @@ bool ftDataMultiplex::handleRecvClientChunkMapRequest(const std::string& peerId,
|
|||
return true ;
|
||||
}
|
||||
|
||||
bool ftDataMultiplex::handleRecvChunkCrcRequest(const std::string& peerId, const std::string& hash, uint32_t chunk_number)
|
||||
{
|
||||
// look into the sha1sum cache
|
||||
|
||||
std::cerr << "ftDataMultiplex::handleRecvChunkMapReq() looking for chunk " << chunk_number << " for hash " << hash << std::endl;
|
||||
|
||||
Sha1CheckSum crc ;
|
||||
bool found = false ;
|
||||
|
||||
{
|
||||
RsStackMutex stack(dataMtx); /******* LOCK MUTEX ******/
|
||||
|
||||
Sha1CacheEntry& sha1cache(_cached_sha1maps[hash]) ;
|
||||
sha1cache.last_activity = time(NULL) ; // update time_stamp
|
||||
|
||||
if(sha1cache._map.size() > 0 && sha1cache._map.isSet(chunk_number))
|
||||
{
|
||||
crc = sha1cache._map[chunk_number] ;
|
||||
found = true ;
|
||||
}
|
||||
}
|
||||
|
||||
if(found)
|
||||
{
|
||||
std::cerr << "ftDataMultiplex::handleRecvChunkMapReq() found in cache ! Sending " << crc.toStdString() << std::endl;
|
||||
mDataSend->sendSingleChunkCRC(peerId,hash,chunk_number,crc);
|
||||
return true ;
|
||||
}
|
||||
|
||||
std::map<std::string, ftFileProvider *>::iterator it ;
|
||||
std::string filename ;
|
||||
uint64_t filesize =0;
|
||||
found = true ;
|
||||
|
||||
// 1 - look into the list of servers.Not clients ! Clients dont' have verified data.
|
||||
{
|
||||
RsStackMutex stack(dataMtx); /******* LOCK MUTEX ******/
|
||||
|
||||
it = mServers.find(hash) ;
|
||||
|
||||
if(it == mServers.end())
|
||||
found = false ;
|
||||
}
|
||||
|
||||
// 2 - if not found, create a server.
|
||||
//
|
||||
if(!found)
|
||||
{
|
||||
//#ifdef MPLEX_DEBUG
|
||||
std::cerr << "ftDataMultiplex::handleRecvChunkMapReq() ERROR: No matching file Provider for hash " << hash ;
|
||||
std::cerr << std::endl;
|
||||
//#endif
|
||||
if(!handleSearchRequest(peerId,hash))
|
||||
return false ;
|
||||
|
||||
//#ifdef MPLEX_DEBUG
|
||||
std::cerr << "ftDataMultiplex::handleRecvChunkMapReq() A new file Provider has been made up for hash " << hash ;
|
||||
std::cerr << std::endl;
|
||||
//#endif
|
||||
}
|
||||
|
||||
{
|
||||
RsStackMutex stack(dataMtx); /******* LOCK MUTEX ******/
|
||||
it = mServers.find(hash) ;
|
||||
|
||||
if(it == mServers.end()) // handleSearchRequest should have filled mServers[hash], but we have been off-mutex since,
|
||||
{
|
||||
std::cerr << "Could definitely not find a provider for file " << hash << ". Maybe the file does not exist?" << std::endl;
|
||||
return false ; // so it's safer to check again.
|
||||
}
|
||||
else
|
||||
{
|
||||
filesize = it->second->fileSize() ;
|
||||
filename = it->second->fileName() ;
|
||||
}
|
||||
}
|
||||
|
||||
std::cerr << "Computing Sha1 for chunk " << chunk_number<< " of file " << filename << ", hash=" << hash << ", size=" << filesize << std::endl;
|
||||
|
||||
unsigned char *buf = new unsigned char[ChunkMap::CHUNKMAP_FIXED_CHUNK_SIZE] ;
|
||||
FILE *fd = fopen(filename.c_str(),"r") ;
|
||||
|
||||
if(fd == NULL)
|
||||
{
|
||||
std::cerr << "Cannot read file " << filename << ". Something's wrong!" << std::endl;
|
||||
delete buf ;
|
||||
return false ;
|
||||
}
|
||||
uint32_t len ;
|
||||
if(fseeko64(fd,(uint64_t)chunk_number * (uint64_t)ChunkMap::CHUNKMAP_FIXED_CHUNK_SIZE,SEEK_SET)!=0 || 0==(len = fread(buf,1,ChunkMap::CHUNKMAP_FIXED_CHUNK_SIZE,fd)))
|
||||
{
|
||||
std::cerr << "Cannot fseek/read from file " << filename << " at position " << (uint64_t)chunk_number * (uint64_t)ChunkMap::CHUNKMAP_FIXED_CHUNK_SIZE << std::endl;
|
||||
fclose(fd) ;
|
||||
}
|
||||
fclose(fd) ;
|
||||
|
||||
crc = RsDirUtil::sha1sum(buf,len) ;
|
||||
|
||||
// update cache
|
||||
{
|
||||
RsStackMutex stack(dataMtx); /******* LOCK MUTEX ******/
|
||||
|
||||
Sha1CacheEntry& sha1cache(_cached_sha1maps[hash]) ;
|
||||
|
||||
if(sha1cache._map.size() == 0)
|
||||
sha1cache._map = Sha1Map(filesize,ChunkMap::CHUNKMAP_FIXED_CHUNK_SIZE) ;
|
||||
|
||||
sha1cache._map.set(chunk_number,crc) ;
|
||||
}
|
||||
std::cerr << "Sending CRC of chunk " << chunk_number<< " of file " << filename << ", hash=" << hash << ", size=" << filesize << ", crc=" << crc.toStdString() << std::endl;
|
||||
|
||||
mDataSend->sendSingleChunkCRC(peerId,hash,chunk_number,crc);
|
||||
return true ;
|
||||
}
|
||||
|
||||
bool ftDataMultiplex::handleRecvServerChunkMapRequest(const std::string& peerId, const std::string& hash)
|
||||
{
|
||||
CompressedChunkMap cmap ;
|
||||
|
@ -905,6 +1131,44 @@ bool ftDataMultiplex::sendCRC32MapRequest(const std::string& peer_id,const std::
|
|||
{
|
||||
return mDataSend->sendCRC32MapRequest(peer_id,hash);
|
||||
}
|
||||
bool ftDataMultiplex::sendSingleChunkCRCRequests(const std::string& hash, const std::vector<std::pair<uint32_t,std::list<std::string> > >& to_ask)
|
||||
{
|
||||
RsStackMutex stack(dataMtx); /******* LOCK MUTEX ******/
|
||||
|
||||
// Put all requested chunks in the request queue.
|
||||
|
||||
Sha1CacheEntry& ce(_cached_sha1maps[hash]) ;
|
||||
|
||||
for(uint32_t i=0;i<to_ask.size();++i)
|
||||
{
|
||||
ChunkCheckSumSourceList& list(ce._to_ask[to_ask[i].first]) ;
|
||||
|
||||
for(std::list<std::string>::const_iterator it(to_ask[i].second.begin());it!=to_ask[i].second.end();++it)
|
||||
list[*it] = 0 ;
|
||||
}
|
||||
return true ;
|
||||
}
|
||||
|
||||
void ftDataMultiplex::handlePendingCrcRequests()
|
||||
{
|
||||
RsStackMutex stack(dataMtx); /******* LOCK MUTEX ******/
|
||||
|
||||
time_t now = time(NULL) ;
|
||||
uint32_t n=0 ;
|
||||
|
||||
for(std::map<std::string,Sha1CacheEntry>::iterator it(_cached_sha1maps.begin());it!=_cached_sha1maps.end();++it)
|
||||
for(std::map<uint32_t,ChunkCheckSumSourceList>::iterator it2(it->second._to_ask.begin());it2!=it->second._to_ask.end();++it2)
|
||||
for(std::map<std::string,time_t>::iterator it3(it2->second.begin());it3!=it2->second.end();++it3)
|
||||
if(it3->second + MAX_CHECKING_CHUNK_WAIT_DELAY < now) // do nothing, otherwise, ask again
|
||||
{
|
||||
mDataSend->sendSingleChunkCRCRequest(it3->first,it->first,it2->first);
|
||||
it3->second = now ;
|
||||
|
||||
if(++n > MAX_SIMULTANEOUS_CRC_REQUESTS)
|
||||
return ;
|
||||
}
|
||||
}
|
||||
|
||||
void ftDataMultiplex::deleteUnusedServers()
|
||||
{
|
||||
RsStackMutex stack(dataMtx); /******* LOCK MUTEX ******/
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue