Added dynamic choose of sources for chunk crc requests. Fixes the bug that would let

a transfer not finish if the original surce for a crc is not here anymore.

If you have a unfinished transfer do a force-check after restart to get back chunks that 
where still on verificaiton stage.



git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@5315 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
csoler 2012-07-19 20:52:04 +00:00
parent b52fc7708f
commit 24f8ee6222
7 changed files with 108 additions and 34 deletions

View File

@ -211,25 +211,13 @@ void ChunkMap::updateTotalDownloaded()
}
}
void ChunkMap::getChunksToCheck(std::vector<std::pair<uint32_t,std::list<std::string> > >& chunks_crc_to_ask)
void ChunkMap::getChunksToCheck(std::vector<uint32_t>& chunks_crc_to_ask)
{
chunks_crc_to_ask.clear() ;
for(uint32_t i=0;i<_chunks_checking_queue.size();)
{
std::list<std::string> peers ;
for(std::map<std::string,SourceChunksInfo>::const_iterator it2(_peers_chunks_availability.begin());it2!=_peers_chunks_availability.end();++it2)
if(it2->second.cmap[_chunks_checking_queue[i]])
peers.push_back(it2->first) ;
if(peers.empty()) // no peers => can't ask!
{
++i ;
continue ;
}
chunks_crc_to_ask.push_back(std::pair<uint32_t,std::list<std::string> >(_chunks_checking_queue[i],peers)) ;
chunks_crc_to_ask.push_back(_chunks_checking_queue[i]) ;
// remove that chunk from the queue
@ -528,6 +516,15 @@ SourceChunksInfo *ChunkMap::getSourceChunksInfo(const std::string& peer_id)
return &(it->second) ;
}
void ChunkMap::getSourcesList(uint32_t chunk_number,std::vector<std::string>& sources)
{
sources.clear() ;
for(std::map<std::string,SourceChunksInfo>::const_iterator it(_peers_chunks_availability.begin());it!=_peers_chunks_availability.end();++it)
if(it->second.cmap[chunk_number])
sources.push_back(it->first) ;
}
uint32_t ChunkMap::getAvailableChunk(const std::string& peer_id,bool& map_is_too_old)
{
// Quite simple strategy: Check for 1st availabe chunk for this peer starting from the given start location.

View File

@ -185,7 +185,10 @@ class ChunkMap
void setChunkCheckingResult(uint32_t chunk_number, bool succeed) ;
/// returns the current list of chunks to ask for a CRC, and a proposed source for each
void getChunksToCheck(std::vector<std::pair<unsigned int, std::list<std::string> > >& chunks_to_ask) ;
void getChunksToCheck(std::vector<uint32_t>& chunks_to_ask) ;
/// Get all available sources for this chunk
void getSourcesList(uint32_t chunk_number,std::vector<std::string>& sources) ;
/// sets all chunks to checking state
void forceCheck() ;

View File

@ -499,7 +499,7 @@ bool ftDataMultiplex::recvSingleChunkCrc(const std::string& peerId, const std::s
// remove this chunk from the request list as well.
Sha1CacheEntry& sha1cache(_cached_sha1maps[hash]) ;
std::map<uint32_t,ChunkCheckSumSourceList>::iterator it2(sha1cache._to_ask.find(chunk_number)) ;
std::map<uint32_t,std::pair<time_t,ChunkCheckSumSourceList> >::iterator it2(sha1cache._to_ask.find(chunk_number)) ;
if(it2 != sha1cache._to_ask.end())
sha1cache._to_ask.erase(it2) ;
@ -1146,7 +1146,7 @@ bool ftDataMultiplex::sendCRC32MapRequest(const std::string& peer_id,const std::
{
return mDataSend->sendCRC32MapRequest(peer_id,hash);
}
bool ftDataMultiplex::sendSingleChunkCRCRequests(const std::string& hash, const std::vector<std::pair<uint32_t,std::list<std::string> > >& to_ask)
bool ftDataMultiplex::sendSingleChunkCRCRequests(const std::string& hash, const std::vector<uint32_t>& to_ask)
{
RsStackMutex stack(dataMtx); /******* LOCK MUTEX ******/
@ -1156,10 +1156,8 @@ bool ftDataMultiplex::sendSingleChunkCRCRequests(const std::string& hash, const
for(uint32_t i=0;i<to_ask.size();++i)
{
ChunkCheckSumSourceList& list(ce._to_ask[to_ask[i].first]) ;
for(std::list<std::string>::const_iterator it(to_ask[i].second.begin());it!=to_ask[i].second.end();++it)
list[*it] = 0 ;
std::pair<time_t,ChunkCheckSumSourceList>& list(ce._to_ask[to_ask[i]]) ;
list.first = 0 ; // set last request time to 0
}
return true ;
}
@ -1171,22 +1169,87 @@ void ftDataMultiplex::handlePendingCrcRequests()
time_t now = time(NULL) ;
uint32_t n=0 ;
// Go through the list of currently handled hashes. For each of them,
// look for pending chunk crc requests.
// - if the last request is too old, re-ask:
// - ask the file creator about the possible sources for this chunk => returns a list of active sources
// - among active sources, pick the one that has the smallest request time stamp, in the request list.
//
// With this, only active sources are querried.
//
for(std::map<std::string,Sha1CacheEntry>::iterator it(_cached_sha1maps.begin());it!=_cached_sha1maps.end();++it)
for(std::map<uint32_t,ChunkCheckSumSourceList>::iterator it2(it->second._to_ask.begin());it2!=it->second._to_ask.end();++it2)
for(std::map<std::string,time_t>::iterator it3(it2->second.begin());it3!=it2->second.end();++it3)
if(it3->second + MAX_CHECKING_CHUNK_WAIT_DELAY < now) // do nothing, otherwise, ask again
for(std::map<uint32_t,std::pair<time_t,ChunkCheckSumSourceList> >::iterator it2(it->second._to_ask.begin());it2!=it->second._to_ask.end();++it2)
if(it2->second.first + MAX_CHECKING_CHUNK_WAIT_DELAY < now) // is the last request old enough?
{
#ifdef MPLEX_DEBUG
std::cerr << "ftDataMultiplex::handlePendingCrcRequests(): Requesting sources for chunk " << it2->first << ", hash " << it->first << std::endl;
#endif
// 0 - ask which sources can be used for this chunk
//
std::map<std::string,ftClient>::const_iterator it4(mClients.find(it->first)) ;
if(it4 == mClients.end())
continue ;
std::vector<std::string> sources ;
it4->second.mCreator->getSourcesList(it2->first,sources) ;
// 1 - go through all sources. Take the oldest one.
//
std::string best_source ;
time_t oldest_timestamp = now ;
for(uint32_t i=0;i<sources.size();++i)
{
#ifdef MPLEX_DEBUG
std::cerr << "ftDataMultiplex::handlePendingCrcRequests(): Asking crc of chunk " << it2->first << " to peer " << it3->first << " for hash " << it->first << std::endl;
std::cerr << "ftDataMultiplex::handlePendingCrcRequests(): Examining source " << sources[i] << std::endl;
#endif
mDataSend->sendSingleChunkCRCRequest(it3->first,it->first,it2->first);
it3->second = now ;
std::map<std::string,time_t>::const_iterator it3(it2->second.second.find(sources[i])) ;
if(it3 == it2->second.second.end()) // source not found. So this one is surely the oldest one to have been requested.
{
#ifdef MPLEX_DEBUG
std::cerr << "ftDataMultiplex::handlePendingCrcRequests(): not found! So using it directly." << std::endl;
#endif
best_source = sources[i] ;
break ;
}
else if(it3->second <= oldest_timestamp) // do nothing, otherwise, ask again
{
#ifdef MPLEX_DEBUG
std::cerr << "ftDataMultiplex::handlePendingCrcRequests(): not found! So using it directly." << std::endl;
#endif
best_source = sources[i] ;
oldest_timestamp = it3->second ;
}
#ifdef MPLEX_DEBUG
else
std::cerr << "ftDataMultiplex::handlePendingCrcRequests(): Source too recently used! So using it directly." << std::endl;
#endif
}
if(best_source != "")
{
#ifdef MPLEX_DEBUG
std::cerr << "ftDataMultiplex::handlePendingCrcRequests(): Asking crc of chunk " << it2->first << " to peer " << best_source << " for hash " << it->first << std::endl;
#endif
// Use the source to ask the CRC.
//
// sendSingleChunkCRCRequest(peer_id, hash, chunk_id)
//
mDataSend->sendSingleChunkCRCRequest(best_source,it->first,it2->first);
it2->second.second[best_source] = now ;
it2->second.first = now ;
if(++n > MAX_SIMULTANEOUS_CRC_REQUESTS)
return ;
break ; // go to next chunk. Don't ask the same chunk to multiple sources.
}
#ifdef MPLEX_DEBUG
else
std::cerr << "ftDataMultiplex::handlePendingCrcRequests(): no source for chunk " << it2->first << std::endl;
#endif
}
}
void ftDataMultiplex::deleteUnusedServers()

View File

@ -86,7 +86,7 @@ class Sha1CacheEntry
Sha1Map _map ; // Map of available sha1 sums for every chunk.
time_t last_activity ; // This is used for removing unused entries.
std::vector<uint32_t> _received ; // received chunk ids. To bedispatched.
std::map<uint32_t,ChunkCheckSumSourceList> _to_ask ; // Chunks to ask to sources.
std::map<uint32_t,std::pair<time_t,ChunkCheckSumSourceList> > _to_ask ; // Chunks to ask to sources.
};
class ftDataMultiplex: public ftDataRecv, public RsQueueThread
@ -128,7 +128,7 @@ class ftDataMultiplex: public ftDataRecv, public RsQueueThread
bool computeAndSendCRC32Map(const std::string& peerId, const std::string& hash) ;
/* called from a separate thread */
bool sendSingleChunkCRCRequests(const std::string& hash, const std::vector<std::pair<uint32_t, std::list<std::string > > >& to_ask) ;
bool sendSingleChunkCRCRequests(const std::string& hash, const std::vector<uint32_t>& to_ask) ;
bool dispatchReceivedChunkCheckSum() ;

View File

@ -616,7 +616,14 @@ void ftFileCreator::forceCheck()
chunkMap.forceCheck();
}
void ftFileCreator::getChunksToCheck(std::vector<std::pair<uint32_t,std::list<std::string> > >& chunks_to_ask)
void ftFileCreator::getSourcesList(uint32_t chunk_num,std::vector<std::string>& sources)
{
RsStackMutex stack(ftcMutex); /********** STACK LOCKED MTX ******/
chunkMap.getSourcesList(chunk_num,sources) ;
}
void ftFileCreator::getChunksToCheck(std::vector<uint32_t>& chunks_to_ask)
{
RsStackMutex stack(ftcMutex); /********** STACK LOCKED MTX ******/

View File

@ -86,7 +86,7 @@ class ftFileCreator: public ftFileProvider
// Looks into the chunkmap for downloaded chunks that have not yet been certified.
// For each of them, returns the chunk number and a source peer to ask the CRC to.
//
void getChunksToCheck(std::vector<std::pair<uint32_t,std::list<std::string> > >& chunks_to_ask) ;
void getChunksToCheck(std::vector<uint32_t>& chunks_to_ask) ;
/*
* creation functions for FileCreator
@ -107,6 +107,10 @@ class ftFileCreator: public ftFileProvider
// removes the designated file source from the chunkmap.
void removeFileSource(const std::string& peer_id) ;
// Get all available sources for this chunk
//
void getSourcesList(uint32_t chunk_number,std::vector<std::string>& sources) ;
// Returns resets the time stamp of the last data receive.
time_t lastRecvTimeStamp() ;
void resetRecvTimeStamp() ;

View File

@ -431,7 +431,7 @@ bool ftTransferModule::queryInactive()
else
{
// request for CRCs to ask
std::vector<std::pair<uint32_t,std::list<std::string> > > chunks_to_ask ;
std::vector<uint32_t> chunks_to_ask ;
#ifdef FT_DEBUG
std::cerr << "ftTransferModule::queryInactive() : getting chunks to check." << std::endl;