diff --git a/libretroshare/src/ft/ftchunkmap.cc b/libretroshare/src/ft/ftchunkmap.cc index 3fd64892b..b83c98769 100644 --- a/libretroshare/src/ft/ftchunkmap.cc +++ b/libretroshare/src/ft/ftchunkmap.cc @@ -39,6 +39,7 @@ static const uint32_t SOURCE_CHUNK_MAP_UPDATE_PERIOD = 60 ; //! TTL for chunkmap info static const uint32_t INACTIVE_CHUNK_TIME_LAPSE = 3600 ; //! TTL for an inactive chunk static const uint32_t FT_CHUNKMAP_MAX_CHUNK_JUMP = 50 ; //! Maximum chunk jump in progressive DL mode +static const uint32_t FT_CHUNKMAP_MAX_SLICE_REASK_DELAY = 10 ; //! Maximum chunk jump in progressive DL mode std::ostream& operator<<(std::ostream& o,const ftChunk& c) { @@ -120,7 +121,7 @@ void ChunkMap::setAvailabilityMap(const CompressedChunkMap& map) } } -void ChunkMap::dataReceived(const ftChunk::ChunkId& cid) +void ChunkMap::dataReceived(const ftChunk::OffsetInFile& cid) { // 1 - find which chunk contains the received data. // @@ -139,7 +140,7 @@ void ChunkMap::dataReceived(const ftChunk::ChunkId& cid) return ; } - std::map::iterator it(itc->second._slices.find(cid)) ; + std::map::iterator it(itc->second._slices.find(cid)) ; if(it == itc->second._slices.end()) { @@ -150,8 +151,8 @@ void ChunkMap::dataReceived(const ftChunk::ChunkId& cid) return ; } - _total_downloaded += it->second ; - itc->second._remains -= it->second ; + _total_downloaded += it->second.size ; + itc->second._remains -= it->second.size ; itc->second._slices.erase(it) ; itc->second._last_data_received = time(NULL) ; // update time stamp @@ -256,6 +257,36 @@ void ChunkMap::setChunkCheckingResult(uint32_t chunk_number,bool check_succeeded } } +bool ChunkMap::reAskPendingChunk(const RsPeerId& peer_id,uint32_t size_hint,uint64_t& offset,uint32_t& size) +{ + // make sure that we're at the end of the file. No need to be too greedy in the middle of it. + + for(uint32_t i=0;i<_map.size();++i) + if(_map[i] == FileChunksInfo::CHUNK_OUTSTANDING) + return false ; + + time_t now = time(NULL); + + for(std::map::iterator it(_slices_to_download.begin());it!=_slices_to_download.end();++it) + for(std::map::iterator it2(it->second._slices.begin());it2!=it->second._slices.end();++it2) + if(it2->second.request_time + FT_CHUNKMAP_MAX_SLICE_REASK_DELAY < now && it2->second.peers.end()==it2->second.peers.find(peer_id)) + { + offset = it2->first; + size = it2->second.size ; + +#ifdef DEBUG_FTCHUNK + std::cerr << "*** ChunkMap::reAskPendingChunk: re-asking slice (" << offset << ", " << size << ") to peer " << peer_id << std::endl; +#endif + + it2->second.request_time = now ; + it2->second.peers.insert(peer_id) ; + + return true ; + } + + return false ; +} + // Warning: a chunk may be empty, but still being downloaded, so asking new slices from it // will produce slices of size 0. This happens at the end of each chunk. // --> I need to get slices from the next chunk, in such a case. @@ -295,7 +326,7 @@ bool ChunkMap::getDataChunk(const RsPeerId& peer_id,uint32_t size_hint,ftChunk& ChunkDownloadInfo& cdi(_slices_to_download[c]) ; falsafe_it = pit ; // let's keep this one just in case. - if(cdi._slices.rbegin() != cdi._slices.rend() && cdi._slices.rbegin()->second*0.7 <= (float)size_hint) + if(cdi._slices.rbegin() != cdi._slices.rend() && cdi._slices.rbegin()->second.size*0.7 <= (float)size_hint) { it = pit ; #ifdef DEBUG_FTCHUNK @@ -348,9 +379,14 @@ bool ChunkMap::getDataChunk(const RsPeerId& peer_id,uint32_t size_hint,ftChunk& // Get the first slice of the chunk, that is at most of length size // it->second.getSlice(size_hint,chunk) ; - _slices_to_download[chunk.offset/_chunk_size]._slices[chunk.id] = chunk.size ; _slices_to_download[chunk.offset/_chunk_size]._last_data_received = time(NULL) ; + ChunkDownloadInfo::SliceRequestInfo& r(_slices_to_download[chunk.offset/_chunk_size]._slices[chunk.id]); + + r.size = chunk.size ; + r.request_time = time(NULL); + r.peers.insert(peer_id); + chunk.peer_id = peer_id ; #ifdef DEBUG_FTCHUNK @@ -362,7 +398,7 @@ bool ChunkMap::getDataChunk(const RsPeerId& peer_id,uint32_t size_hint,ftChunk& return true ; } -void ChunkMap::removeInactiveChunks(std::vector& to_remove) +void ChunkMap::removeInactiveChunks(std::vector& to_remove) { to_remove.clear() ; time_t now = time(NULL) ; @@ -377,7 +413,7 @@ void ChunkMap::removeInactiveChunks(std::vector& to_remove) // std::map::iterator tmp(it) ; - for(std::map::const_iterator it2(it->second._slices.begin());it2!=it->second._slices.end();++it2) + for(std::map::const_iterator it2(it->second._slices.begin());it2!=it->second._slices.end();++it2) to_remove.push_back(it2->first) ; _map[it->first] = FileChunksInfo::CHUNK_OUTSTANDING ; // reset the chunk diff --git a/libretroshare/src/ft/ftchunkmap.h b/libretroshare/src/ft/ftchunkmap.h index 3ee4a1234..f7c696b9a 100644 --- a/libretroshare/src/ft/ftchunkmap.h +++ b/libretroshare/src/ft/ftchunkmap.h @@ -32,7 +32,7 @@ class ftController ; class ftChunk { public: - typedef uint64_t ChunkId ; + typedef uint64_t OffsetInFile ; ftChunk():offset(0), size(0), id(0), ts(0),ref_cnt(NULL) {} @@ -40,7 +40,7 @@ class ftChunk uint64_t offset; // current offset of the slice uint64_t size; // size remaining to download - ChunkId id ; // id of the chunk. Equal to the starting offset of the chunk + OffsetInFile id ; // id of the chunk. Equal to the starting offset of the chunk time_t ts; // time of last data received int *ref_cnt; // shared counter of number of sub-blocks. Used when a slice gets split. RsPeerId peer_id ; @@ -70,10 +70,17 @@ class Chunk uint64_t _end ; // const }; -class ChunkDownloadInfo +struct ChunkDownloadInfo { public: - std::map _slices ; + struct SliceRequestInfo + { + uint32_t size ; // size of the slice + time_t request_time ; // last request time + std::set peers ; // peers the slice was requested to. Normally only one, except at the end of the file. + }; + + std::map _slices ; uint32_t _remains ; time_t _last_data_received ; }; @@ -109,24 +116,29 @@ class ChunkMap /// destructor virtual ~ChunkMap() {} - /// Returns an slice of data to be asked to the peer within a chunk. + /// Returns an slice of data to be asked to the peer within a chunk. /// If a chunk is already been downloaded by this peer, take a slice at /// the beginning of this chunk, or at least where it starts. - /// If not, randomly/streamly select a new chunk depending on the strategy. - /// adds an entry in the chunk_ids map, and sets up 1 interval for it. - /// the chunk should be available from the designated peer. + /// If not, randomly/streamly select a new chunk depending on the strategy. + /// adds an entry in the chunk_ids map, and sets up 1 interval for it. + /// the chunk should be available from the designated peer. /// On return: /// - source_chunk_map_needed = true if the source map should be asked virtual bool getDataChunk(const RsPeerId& peer_id,uint32_t size_hint,ftChunk& chunk,bool& source_chunk_map_needed) ; - /// Notify received a slice of data. This needs to - /// - carve in the map of chunks what is received, what is not. - /// - tell which chunks are finished. For this, each interval must know what chunk number it has been attributed - /// when the interval is split in the middle, the number of intervals for the chunk is increased. If the interval is - /// completely covered by the data, the interval number is decreased. + /// Returns an already pending slice that was being downloaded but hasn't arrived yet. This is mostly used at the end of the file + /// in order to re-ask pendign slices to active peers while slow peers take a lot of time to send their remaining slices. + /// + bool reAskPendingChunk(const RsPeerId& peer_id,uint32_t size_hint,uint64_t& offset,uint32_t& size); - virtual void dataReceived(const ftChunk::ChunkId& c_id) ; + /// Notify received a slice of data. This needs to + /// - carve in the map of chunks what is received, what is not. + /// - tell which chunks are finished. For this, each interval must know what chunk number it has been attributed + /// when the interval is split in the middle, the number of intervals for the chunk is increased. If the interval is + /// completely covered by the data, the interval number is decreased. + + virtual void dataReceived(const ftChunk::OffsetInFile& c_id) ; /// Decides how chunks are selected. /// STREAMING: the 1st chunk is always returned @@ -163,7 +175,7 @@ class ChunkMap /// Remove active chunks that have not received any data for the last 60 seconds, and return /// the list of slice numbers that should be canceled. - void removeInactiveChunks(std::vector& to_remove) ; + void removeInactiveChunks(std::vector& to_remove) ; /// Updates the peer's availablility map // @@ -214,7 +226,7 @@ class ChunkMap uint32_t _chunk_size ; //! Size of chunks. Common to all chunks. FileChunksInfo::ChunkStrategy _strategy ; //! how do we allocate new chunks std::map _active_chunks_feed ; //! vector of chunks being downloaded. Exactly 1 chunk per peer. - std::map _slices_to_download ; //! list of (slice id,slice size) + std::map _slices_to_download ; //! list of (slice offset,slice size) currently being downloaded std::vector _map ; //! vector of chunk state over the whole file std::map _peers_chunks_availability ; //! what does each source peer have uint64_t _total_downloaded ; //! completion for the file diff --git a/libretroshare/src/ft/ftfilecreator.cc b/libretroshare/src/ft/ftfilecreator.cc index c47cc1c4c..56bf78bf7 100644 --- a/libretroshare/src/ft/ftfilecreator.cc +++ b/libretroshare/src/ft/ftfilecreator.cc @@ -240,7 +240,7 @@ void ftFileCreator::removeInactiveChunks() #ifdef FILE_DEBUG std::cerr << "ftFileCreator::removeInactiveChunks(): looking for old chunks." << std::endl ; #endif - std::vector to_remove ; + std::vector to_remove ; chunkMap.removeInactiveChunks(to_remove) ; @@ -421,7 +421,9 @@ int ftFileCreator::locked_notifyReceived(uint64_t offset, uint32_t chunk_size) if(!found) { +#ifdef FILE_DEBUG std::cerr << "ftFileCreator::locked_notifyReceived(): failed to find an active slice for " << offset << "+" << chunk_size << ", hash = " << hash << ": dropping data." << std::endl; +#endif return 0; /* ignoring */ } } @@ -531,7 +533,14 @@ bool ftFileCreator::getMissingChunk(const RsPeerId& peer_id,uint32_t size_hint,u ftChunk chunk ; if(!chunkMap.getDataChunk(peer_id,size_hint,chunk,source_chunk_map_needed)) + { + // No chunks are available. We brutally re-ask an ongoing chunk to another peer. + + if(chunkMap.reAskPendingChunk(peer_id,size_hint,offset,size)) + return true ; + return false ; + } #ifdef FILE_DEBUG std::cerr << "ffc::getMissingChunk() Retrieved new chunk: " << chunk << std::endl ;