Merge pull request #1214 from csoler/v0.6-FT

enabled aggressive re-request of pending slices at end of transfer, t…
This commit is contained in:
csoler 2018-03-17 18:25:47 +01:00 committed by GitHub
commit 143210412b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 93 additions and 27 deletions

View File

@ -39,6 +39,7 @@
static const uint32_t SOURCE_CHUNK_MAP_UPDATE_PERIOD = 60 ; //! TTL for chunkmap info static const uint32_t SOURCE_CHUNK_MAP_UPDATE_PERIOD = 60 ; //! TTL for chunkmap info
static const uint32_t INACTIVE_CHUNK_TIME_LAPSE = 3600 ; //! TTL for an inactive chunk static const uint32_t INACTIVE_CHUNK_TIME_LAPSE = 3600 ; //! TTL for an inactive chunk
static const uint32_t FT_CHUNKMAP_MAX_CHUNK_JUMP = 50 ; //! Maximum chunk jump in progressive DL mode static const uint32_t FT_CHUNKMAP_MAX_CHUNK_JUMP = 50 ; //! Maximum chunk jump in progressive DL mode
static const uint32_t FT_CHUNKMAP_MAX_SLICE_REASK_DELAY = 10 ; //! Maximum time to re-ask a slice to another peer at end of transfer
std::ostream& operator<<(std::ostream& o,const ftChunk& c) std::ostream& operator<<(std::ostream& o,const ftChunk& c)
{ {
@ -120,7 +121,7 @@ void ChunkMap::setAvailabilityMap(const CompressedChunkMap& map)
} }
} }
void ChunkMap::dataReceived(const ftChunk::ChunkId& cid) void ChunkMap::dataReceived(const ftChunk::OffsetInFile& cid)
{ {
// 1 - find which chunk contains the received data. // 1 - find which chunk contains the received data.
// //
@ -139,7 +140,7 @@ void ChunkMap::dataReceived(const ftChunk::ChunkId& cid)
return ; return ;
} }
std::map<ftChunk::ChunkId,uint32_t>::iterator it(itc->second._slices.find(cid)) ; std::map<ftChunk::OffsetInFile,ChunkDownloadInfo::SliceRequestInfo>::iterator it(itc->second._slices.find(cid)) ;
if(it == itc->second._slices.end()) if(it == itc->second._slices.end())
{ {
@ -150,8 +151,8 @@ void ChunkMap::dataReceived(const ftChunk::ChunkId& cid)
return ; return ;
} }
_total_downloaded += it->second ; _total_downloaded += it->second.size ;
itc->second._remains -= it->second ; itc->second._remains -= it->second.size ;
itc->second._slices.erase(it) ; itc->second._slices.erase(it) ;
itc->second._last_data_received = time(NULL) ; // update time stamp itc->second._last_data_received = time(NULL) ; // update time stamp
@ -256,6 +257,36 @@ void ChunkMap::setChunkCheckingResult(uint32_t chunk_number,bool check_succeeded
} }
} }
bool ChunkMap::reAskPendingChunk(const RsPeerId& peer_id,uint32_t size_hint,uint64_t& offset,uint32_t& size)
{
// make sure that we're at the end of the file. No need to be too greedy in the middle of it.
for(uint32_t i=0;i<_map.size();++i)
if(_map[i] == FileChunksInfo::CHUNK_OUTSTANDING)
return false ;
time_t now = time(NULL);
for(std::map<uint32_t,ChunkDownloadInfo>::iterator it(_slices_to_download.begin());it!=_slices_to_download.end();++it)
for(std::map<ftChunk::OffsetInFile,ChunkDownloadInfo::SliceRequestInfo >::iterator it2(it->second._slices.begin());it2!=it->second._slices.end();++it2)
if(it2->second.request_time + FT_CHUNKMAP_MAX_SLICE_REASK_DELAY < now && it2->second.peers.end()==it2->second.peers.find(peer_id))
{
offset = it2->first;
size = it2->second.size ;
#ifdef DEBUG_FTCHUNK
std::cerr << "*** ChunkMap::reAskPendingChunk: re-asking slice (" << offset << ", " << size << ") to peer " << peer_id << std::endl;
#endif
it2->second.request_time = now ;
it2->second.peers.insert(peer_id) ;
return true ;
}
return false ;
}
// Warning: a chunk may be empty, but still being downloaded, so asking new slices from it // Warning: a chunk may be empty, but still being downloaded, so asking new slices from it
// will produce slices of size 0. This happens at the end of each chunk. // will produce slices of size 0. This happens at the end of each chunk.
// --> I need to get slices from the next chunk, in such a case. // --> I need to get slices from the next chunk, in such a case.
@ -295,7 +326,7 @@ bool ChunkMap::getDataChunk(const RsPeerId& peer_id,uint32_t size_hint,ftChunk&
ChunkDownloadInfo& cdi(_slices_to_download[c]) ; ChunkDownloadInfo& cdi(_slices_to_download[c]) ;
falsafe_it = pit ; // let's keep this one just in case. falsafe_it = pit ; // let's keep this one just in case.
if(cdi._slices.rbegin() != cdi._slices.rend() && cdi._slices.rbegin()->second*0.7 <= (float)size_hint) if(cdi._slices.rbegin() != cdi._slices.rend() && cdi._slices.rbegin()->second.size*0.7 <= (float)size_hint)
{ {
it = pit ; it = pit ;
#ifdef DEBUG_FTCHUNK #ifdef DEBUG_FTCHUNK
@ -348,9 +379,14 @@ bool ChunkMap::getDataChunk(const RsPeerId& peer_id,uint32_t size_hint,ftChunk&
// Get the first slice of the chunk, that is at most of length size // Get the first slice of the chunk, that is at most of length size
// //
it->second.getSlice(size_hint,chunk) ; it->second.getSlice(size_hint,chunk) ;
_slices_to_download[chunk.offset/_chunk_size]._slices[chunk.id] = chunk.size ;
_slices_to_download[chunk.offset/_chunk_size]._last_data_received = time(NULL) ; _slices_to_download[chunk.offset/_chunk_size]._last_data_received = time(NULL) ;
ChunkDownloadInfo::SliceRequestInfo& r(_slices_to_download[chunk.offset/_chunk_size]._slices[chunk.id]);
r.size = chunk.size ;
r.request_time = time(NULL);
r.peers.insert(peer_id);
chunk.peer_id = peer_id ; chunk.peer_id = peer_id ;
#ifdef DEBUG_FTCHUNK #ifdef DEBUG_FTCHUNK
@ -362,7 +398,7 @@ bool ChunkMap::getDataChunk(const RsPeerId& peer_id,uint32_t size_hint,ftChunk&
return true ; return true ;
} }
void ChunkMap::removeInactiveChunks(std::vector<ftChunk::ChunkId>& to_remove) void ChunkMap::removeInactiveChunks(std::vector<ftChunk::OffsetInFile>& to_remove)
{ {
to_remove.clear() ; to_remove.clear() ;
time_t now = time(NULL) ; time_t now = time(NULL) ;
@ -377,7 +413,7 @@ void ChunkMap::removeInactiveChunks(std::vector<ftChunk::ChunkId>& to_remove)
// //
std::map<ChunkNumber,ChunkDownloadInfo>::iterator tmp(it) ; std::map<ChunkNumber,ChunkDownloadInfo>::iterator tmp(it) ;
for(std::map<ftChunk::ChunkId,uint32_t>::const_iterator it2(it->second._slices.begin());it2!=it->second._slices.end();++it2) for(std::map<ftChunk::OffsetInFile,ChunkDownloadInfo::SliceRequestInfo>::const_iterator it2(it->second._slices.begin());it2!=it->second._slices.end();++it2)
to_remove.push_back(it2->first) ; to_remove.push_back(it2->first) ;
_map[it->first] = FileChunksInfo::CHUNK_OUTSTANDING ; // reset the chunk _map[it->first] = FileChunksInfo::CHUNK_OUTSTANDING ; // reset the chunk

View File

@ -32,7 +32,7 @@ class ftController ;
class ftChunk class ftChunk
{ {
public: public:
typedef uint64_t ChunkId ; typedef uint64_t OffsetInFile ;
ftChunk():offset(0), size(0), id(0), ts(0),ref_cnt(NULL) {} ftChunk():offset(0), size(0), id(0), ts(0),ref_cnt(NULL) {}
@ -40,7 +40,7 @@ class ftChunk
uint64_t offset; // current offset of the slice uint64_t offset; // current offset of the slice
uint64_t size; // size remaining to download uint64_t size; // size remaining to download
ChunkId id ; // id of the chunk. Equal to the starting offset of the chunk OffsetInFile id ; // id of the chunk. Equal to the starting offset of the chunk
time_t ts; // time of last data received time_t ts; // time of last data received
int *ref_cnt; // shared counter of number of sub-blocks. Used when a slice gets split. int *ref_cnt; // shared counter of number of sub-blocks. Used when a slice gets split.
RsPeerId peer_id ; RsPeerId peer_id ;
@ -70,10 +70,17 @@ class Chunk
uint64_t _end ; // const uint64_t _end ; // const
}; };
class ChunkDownloadInfo struct ChunkDownloadInfo
{ {
public: public:
std::map<ftChunk::ChunkId,uint32_t> _slices ; struct SliceRequestInfo
{
uint32_t size ; // size of the slice
time_t request_time ; // last request time
std::set<RsPeerId> peers ; // peers the slice was requested to. Normally only one, except at the end of the file.
};
std::map<ftChunk::OffsetInFile,SliceRequestInfo> _slices ;
uint32_t _remains ; uint32_t _remains ;
time_t _last_data_received ; time_t _last_data_received ;
}; };
@ -109,24 +116,29 @@ class ChunkMap
/// destructor /// destructor
virtual ~ChunkMap() {} virtual ~ChunkMap() {}
/// Returns an slice of data to be asked to the peer within a chunk. /// Returns an slice of data to be asked to the peer within a chunk.
/// If a chunk is already been downloaded by this peer, take a slice at /// If a chunk is already been downloaded by this peer, take a slice at
/// the beginning of this chunk, or at least where it starts. /// the beginning of this chunk, or at least where it starts.
/// If not, randomly/streamly select a new chunk depending on the strategy. /// If not, randomly/streamly select a new chunk depending on the strategy.
/// adds an entry in the chunk_ids map, and sets up 1 interval for it. /// adds an entry in the chunk_ids map, and sets up 1 interval for it.
/// the chunk should be available from the designated peer. /// the chunk should be available from the designated peer.
/// On return: /// On return:
/// - source_chunk_map_needed = true if the source map should be asked /// - source_chunk_map_needed = true if the source map should be asked
virtual bool getDataChunk(const RsPeerId& peer_id,uint32_t size_hint,ftChunk& chunk,bool& source_chunk_map_needed) ; virtual bool getDataChunk(const RsPeerId& peer_id,uint32_t size_hint,ftChunk& chunk,bool& source_chunk_map_needed) ;
/// Notify received a slice of data. This needs to /// Returns an already pending slice that was being downloaded but hasn't arrived yet. This is mostly used at the end of the file
/// - carve in the map of chunks what is received, what is not. /// in order to re-ask pendign slices to active peers while slow peers take a lot of time to send their remaining slices.
/// - tell which chunks are finished. For this, each interval must know what chunk number it has been attributed ///
/// when the interval is split in the middle, the number of intervals for the chunk is increased. If the interval is bool reAskPendingChunk(const RsPeerId& peer_id,uint32_t size_hint,uint64_t& offset,uint32_t& size);
/// completely covered by the data, the interval number is decreased.
virtual void dataReceived(const ftChunk::ChunkId& c_id) ; /// Notify received a slice of data. This needs to
/// - carve in the map of chunks what is received, what is not.
/// - tell which chunks are finished. For this, each interval must know what chunk number it has been attributed
/// when the interval is split in the middle, the number of intervals for the chunk is increased. If the interval is
/// completely covered by the data, the interval number is decreased.
virtual void dataReceived(const ftChunk::OffsetInFile& c_id) ;
/// Decides how chunks are selected. /// Decides how chunks are selected.
/// STREAMING: the 1st chunk is always returned /// STREAMING: the 1st chunk is always returned
@ -163,7 +175,7 @@ class ChunkMap
/// Remove active chunks that have not received any data for the last 60 seconds, and return /// Remove active chunks that have not received any data for the last 60 seconds, and return
/// the list of slice numbers that should be canceled. /// the list of slice numbers that should be canceled.
void removeInactiveChunks(std::vector<ftChunk::ChunkId>& to_remove) ; void removeInactiveChunks(std::vector<ftChunk::OffsetInFile>& to_remove) ;
/// Updates the peer's availablility map /// Updates the peer's availablility map
// //
@ -214,7 +226,7 @@ class ChunkMap
uint32_t _chunk_size ; //! Size of chunks. Common to all chunks. uint32_t _chunk_size ; //! Size of chunks. Common to all chunks.
FileChunksInfo::ChunkStrategy _strategy ; //! how do we allocate new chunks FileChunksInfo::ChunkStrategy _strategy ; //! how do we allocate new chunks
std::map<RsPeerId,Chunk> _active_chunks_feed ; //! vector of chunks being downloaded. Exactly 1 chunk per peer. std::map<RsPeerId,Chunk> _active_chunks_feed ; //! vector of chunks being downloaded. Exactly 1 chunk per peer.
std::map<ChunkNumber,ChunkDownloadInfo> _slices_to_download ; //! list of (slice id,slice size) std::map<ChunkNumber,ChunkDownloadInfo> _slices_to_download ; //! list of (slice offset,slice size) currently being downloaded
std::vector<FileChunksInfo::ChunkState> _map ; //! vector of chunk state over the whole file std::vector<FileChunksInfo::ChunkState> _map ; //! vector of chunk state over the whole file
std::map<RsPeerId,SourceChunksInfo> _peers_chunks_availability ; //! what does each source peer have std::map<RsPeerId,SourceChunksInfo> _peers_chunks_availability ; //! what does each source peer have
uint64_t _total_downloaded ; //! completion for the file uint64_t _total_downloaded ; //! completion for the file

View File

@ -240,7 +240,7 @@ void ftFileCreator::removeInactiveChunks()
#ifdef FILE_DEBUG #ifdef FILE_DEBUG
std::cerr << "ftFileCreator::removeInactiveChunks(): looking for old chunks." << std::endl ; std::cerr << "ftFileCreator::removeInactiveChunks(): looking for old chunks." << std::endl ;
#endif #endif
std::vector<ftChunk::ChunkId> to_remove ; std::vector<ftChunk::OffsetInFile> to_remove ;
chunkMap.removeInactiveChunks(to_remove) ; chunkMap.removeInactiveChunks(to_remove) ;
@ -421,7 +421,9 @@ int ftFileCreator::locked_notifyReceived(uint64_t offset, uint32_t chunk_size)
if(!found) if(!found)
{ {
#ifdef FILE_DEBUG
std::cerr << "ftFileCreator::locked_notifyReceived(): failed to find an active slice for " << offset << "+" << chunk_size << ", hash = " << hash << ": dropping data." << std::endl; std::cerr << "ftFileCreator::locked_notifyReceived(): failed to find an active slice for " << offset << "+" << chunk_size << ", hash = " << hash << ": dropping data." << std::endl;
#endif
return 0; /* ignoring */ return 0; /* ignoring */
} }
} }
@ -531,7 +533,14 @@ bool ftFileCreator::getMissingChunk(const RsPeerId& peer_id,uint32_t size_hint,u
ftChunk chunk ; ftChunk chunk ;
if(!chunkMap.getDataChunk(peer_id,size_hint,chunk,source_chunk_map_needed)) if(!chunkMap.getDataChunk(peer_id,size_hint,chunk,source_chunk_map_needed))
{
// No chunks are available. We brutally re-ask an ongoing chunk to another peer.
if(chunkMap.reAskPendingChunk(peer_id,size_hint,offset,size))
return true ;
return false ; return false ;
}
#ifdef FILE_DEBUG #ifdef FILE_DEBUG
std::cerr << "ffc::getMissingChunk() Retrieved new chunk: " << chunk << std::endl ; std::cerr << "ffc::getMissingChunk() Retrieved new chunk: " << chunk << std::endl ;

View File

@ -639,8 +639,17 @@ void RsCollectionDialog::directoryLoaded(QString dirLoaded)
*/ */
void RsCollectionDialog::updateSizes() void RsCollectionDialog::updateSizes()
{ {
ui._selectedFiles_TL->setText(QString::number(getRootItem()->data(COLUMN_FILEC,ROLE_SELFILEC).toULongLong())) ; uint64_t total_size = 0 ;
ui._totalSize_TL->setText(misc::friendlyUnit(getRootItem()->data(COLUMN_SIZE,ROLE_SELSIZE).toULongLong())) ; uint32_t total_count = 0 ;
for(uint32_t i=0;i<ui._fileEntriesTW->topLevelItemCount();++i)
{
total_size += ui._fileEntriesTW->topLevelItem(i)->data(COLUMN_SIZE ,ROLE_SELSIZE ).toULongLong();
total_count += ui._fileEntriesTW->topLevelItem(i)->data(COLUMN_FILEC,ROLE_SELFILEC).toULongLong();
}
ui._selectedFiles_TL->setText(QString::number(total_count));
ui._totalSize_TL->setText(misc::friendlyUnit(total_size));
} }
/** /**