diff --git a/libretroshare/src/ft/ftchunkmap.h b/libretroshare/src/ft/ftchunkmap.h index 1f60b6fcc..1a269ef82 100644 --- a/libretroshare/src/ft/ftchunkmap.h +++ b/libretroshare/src/ft/ftchunkmap.h @@ -34,7 +34,7 @@ class ftChunk public: typedef uint64_t ChunkId ; - ftChunk():offset(0), size(0), ts(0) {} + ftChunk():offset(0), size(0), ts(0),ref_cnt(NULL) {} friend std::ostream& operator<<(std::ostream& o,const ftChunk& f) ; @@ -42,6 +42,7 @@ class ftChunk uint64_t size; // size remaining to download ChunkId id ; // id of the chunk. Equal to the starting offset of the chunk time_t ts; // time of last data received + int *ref_cnt; // shared counter of number of sub-blocks. Used when a slice gets split. std::string peer_id ; }; diff --git a/libretroshare/src/ft/ftfilecreator.cc b/libretroshare/src/ft/ftfilecreator.cc index abbc22311..910e6ac61 100644 --- a/libretroshare/src/ft/ftfilecreator.cc +++ b/libretroshare/src/ft/ftfilecreator.cc @@ -8,8 +8,8 @@ * #define FILE_DEBUG 1 ******/ -#define CHUNK_MAX_AGE 40 -#define MAX_FTCHUNKS_PER_PEER 5 +#define CHUNK_MAX_AGE 40 +#define MAX_FTCHUNKS_PER_PEER 5 /*********************************************************** * @@ -209,6 +209,8 @@ void ftFileCreator::removeInactiveChunks() { std::map::iterator tmp(it) ; ++it ; + if(--*tmp->second.ref_cnt == 0) + delete tmp->second.ref_cnt; --mChunksPerPeer[tmp->second.peer_id].cnt ; mChunks.erase(tmp) ; } @@ -307,21 +309,72 @@ int ftFileCreator::locked_notifyReceived(uint64_t offset, uint32_t chunk_size) /* find the chunk */ std::map::iterator it = mChunks.find(offset); + ftChunk chunk ; - if (it == mChunks.end()) + if(it == mChunks.end()) { + // Chunk is not found. Maybe that is because the packet is in the middle + // of an existing chunk. This case occurs whenever a packet is lost due + // to temporarily interrupted connexion. In such a case we split the + // chunk. Any pending block will be re-asked to the source after 40 secs. + // #ifdef FILE_DEBUG - std::cerr << "ftFileCreator::locked_notifyReceived() "; - std::cerr << " Failed to match to existing chunk - ignoring"; - std::cerr << std::endl; + std::cerr << "ftFileCreator::locked_notifyReceived(): Failed to match to existing chunk. A packet was probably dropped. Strategy is:" << std::endl; + std::cerr << " - find corresponding chunk, split it in two pieces and ask again the first part." << std::endl; locked_printChunkMap(); #endif - return 0; /* ignoring */ - } + bool found = false ; - ftChunk chunk = it->second; - mChunks.erase(it); + for(std::map::iterator it2=mChunks.begin();it2!=mChunks.end();++it2) + if( it2->second.offset < offset && it2->second.size+it2->second.offset >= chunk_size+offset) // found it if it started strictly after the beginning of the chunk and ends before its end. + { + it = it2 ; +#ifdef FILE_DEBUG + std::cerr << "ftFileCreator::locked_notifyReceived(): Chunk found: " << it->second.offset << " to " << it->second.offset + it->second.size << std::endl; +#endif + + // 1 - split the chunk into two parts. Re-ask the first part, and keep the second part + // as ongoing. + + ftChunk part1( it->second ); + part1.size = offset - it->second.offset ; // always > 0 + + chunk = it->second ; // saves the data, as it will be erased by next line + mChunks[part1.offset] = part1; + + chunk.offset = offset ; + chunk.size -= part1.size ; + + // 2 - we need to be extra careful: + // - the chunks will have the same id. That's potentially a problem for completing the slice + // we should keep a list of pending chunkIds, so as to only call chunkMap.dataReceived() when + // all parts are obtained. + // - new parts arriving in the second part cannot interfere since they should come in order. + + (*chunk.ref_cnt)++ ; + +#ifdef FILE_DEBUG + std::cerr << "Created two sub chunks. Ref_cnt = " << *chunk.ref_cnt << std::endl; + std::cerr << " chunk1: " << part1 << std::endl; + std::cerr << " chunk2: " << chunk << std::endl; +#endif + + found = true ; + break ; + } + + if(!found) + { + std::cerr << "ftFileCreator::locked_notifyReceived(): failed to find an active slice for " << offset << "+" << chunk_size << ": dropping data." << std::endl; + return 0; /* ignoring */ + } + } + else + { + chunk = it->second; + mChunks.erase(it); + } if (chunk.size != chunk_size) { @@ -330,11 +383,19 @@ int ftFileCreator::locked_notifyReceived(uint64_t offset, uint32_t chunk_size) chunk.offset += chunk_size; mChunks[chunk.offset] = chunk; } - else // notify the chunkmap that the slice is finished, and decrement the number of chunks for this peer. + else if( --*chunk.ref_cnt == 0) // notify the chunkmap that the slice is finished, and decrement the number of chunks for this peer. { +#ifdef FILE_DEBUG + std::cerr << "Chunk finished and ref cnt = " << *chunk.ref_cnt << ": deleting." << std::endl; +#endif chunkMap.dataReceived(chunk.id) ; --mChunksPerPeer[chunk.peer_id].cnt ; + delete chunk.ref_cnt ; // delete the counter } +#ifdef FILE_DEBUG + else + std::cerr << "Chunk finished but ref cnt = " << *chunk.ref_cnt << ": not deleting." << std::endl; +#endif _last_recv_time_t = time(NULL) ; @@ -381,7 +442,25 @@ bool ftFileCreator::getMissingChunk(const std::string& peer_id,uint32_t size_hin locked_printChunkMap(); #endif source_chunk_map_needed = false ; + time_t now = time(NULL) ; + // 0 - is there a faulting chunk that would need to be asked again ? + + for(std::map::iterator it(mChunks.begin());it!=mChunks.end();++it) + if(it->second.ts + CHUNK_MAX_AGE < now) + { + offset = it->second.offset ; + size = it->second.size ; + it->second.ts = now ; + +#ifdef FILE_DEBUG + std::cerr << "ftFileCreator::getMissingChunk(): re-askign for chunk that wasn't received: " << offset << " + " << size << std::endl; +#endif + return true ; + } + + // 1 - is there an ongoing 1MB chunk for which we need to take a new slice? + // uint32_t& chunks_for_this_peer(mChunksPerPeer[peer_id].cnt) ; if(chunks_for_this_peer >= MAX_FTCHUNKS_PER_PEER) @@ -403,6 +482,8 @@ bool ftFileCreator::getMissingChunk(const std::string& peer_id,uint32_t size_hin std::cerr << "ffc::getMissingChunk() Retrieved new chunk: " << chunk << std::endl ; #endif + chunk.ref_cnt = new int ; + *chunk.ref_cnt = 1 ; mChunks[chunk.offset] = chunk ; offset = chunk.offset ;