added regular cleaning of inactive chunks and slices, so that they can be treated by a different peer

git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@2102 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
csoler 2010-01-21 12:31:00 +00:00
parent f481dbef59
commit e2b807aad7
6 changed files with 112 additions and 3 deletions

View File

@ -1,11 +1,14 @@
#include <math.h>
#ifdef DEBUG_FTCHUNK
#include <assert.h>
#endif
#include <math.h>
#include <stdlib.h>
#include <rsiface/rspeers.h>
#include "ftchunkmap.h"
#include <time.h>
static const uint32_t SOURCE_CHUNK_MAP_UPDATE_PERIOD = 60 ; //! TTL for chunkmap info
static const uint32_t SOURCE_CHUNK_MAP_UPDATE_PERIOD = 60 ; //! TTL for chunkmap info
static const uint32_t INACTIVE_CHUNK_TIME_LAPSE = 60 ; //! TTL for an inactive chunk
std::ostream& operator<<(std::ostream& o,const ftChunk& c)
{
@ -78,7 +81,9 @@ void ChunkMap::dataReceived(const ftChunk::ChunkId& cid)
if(itc == _slices_to_download.end())
{
std::cerr << "!!! ChunkMap::dataReceived: error: ChunkId " << cid << " corresponds to chunk number " << n << ", which is not being downloaded!" << std::endl ;
#ifdef DEBUG_FTCHUNK
assert(false) ;
#endif
return ;
}
@ -87,13 +92,16 @@ void ChunkMap::dataReceived(const ftChunk::ChunkId& cid)
if(it == itc->second._slices.end())
{
std::cerr << "!!! ChunkMap::dataReceived: chunk " << cid << " is not found in slice lst of chunk number " << n << std::endl ;
#ifdef DEBUG_FTCHUNK
assert(false) ;
#endif
return ;
}
_total_downloaded += it->second ;
itc->second._remains -= it->second ;
itc->second._slices.erase(it) ;
itc->second._last_data_received = time(NULL) ; // update time stamp
#ifdef DEBUG_FTCHUNK
std::cerr << "*** ChunkMap::dataReceived: received data chunk " << cid << " for chunk number " << n << ", local remains=" << itc->second._remains << ", total downloaded=" << _total_downloaded << ", remains=" << _file_size - _total_downloaded << std::endl ;
@ -171,6 +179,7 @@ bool ChunkMap::getDataChunk(const std::string& peer_id,uint32_t size_hint,ftChun
//
_active_chunks_feed[peer_id].getSlice(size_hint,chunk) ;
_slices_to_download[chunk.offset/_chunk_size]._slices[chunk.id] = chunk.size ;
_slices_to_download[chunk.offset/_chunk_size]._last_data_received = time(NULL) ;
if(_active_chunks_feed[peer_id].empty())
_active_chunks_feed.erase(_active_chunks_feed.find(peer_id)) ;
@ -181,6 +190,45 @@ bool ChunkMap::getDataChunk(const std::string& peer_id,uint32_t size_hint,ftChun
return true ;
}
void ChunkMap::removeInactiveChunks(std::vector<ftChunk::ChunkId>& to_remove)
{
to_remove.clear() ;
time_t now = time(NULL) ;
for(std::map<ChunkNumber,ChunkDownloadInfo>::iterator it(_slices_to_download.begin());it!=_slices_to_download.end();)
if(now - it->second._last_data_received > (int)INACTIVE_CHUNK_TIME_LAPSE)
{
#ifdef DEBUG_FTCHUNK
std::cerr << "ChunkMap::removeInactiveChunks(): removing inactive chunk " << it->first << ", time lapse=" << now - it->second._last_data_received << std::endl ;
#endif
// First, remove all slices from this chunk
//
std::map<ChunkNumber,ChunkDownloadInfo>::iterator tmp(it) ;
for(std::map<ftChunk::ChunkId,uint32_t>::const_iterator it2(it->second._slices.begin());it2!=it->second._slices.end();++it2)
to_remove.push_back(it2->first) ;
_map[it->first] = FileChunksInfo::CHUNK_OUTSTANDING ; // reset the chunk
// Also remove the chunk from the chunk feed, to free the associated peer.
//
for(std::map<std::string,Chunk>::iterator it3=_active_chunks_feed.begin();it3!=_active_chunks_feed.end();)
if(it3->second._start == _chunk_size*uint64_t(it->first))
{
std::map<std::string,Chunk>::iterator tmp3 = it3 ;
++it3 ;
_active_chunks_feed.erase(tmp3) ;
}
else
++it3 ;
++it ;
_slices_to_download.erase(tmp) ;
}
else
++it ;
}
bool ChunkMap::isChunkAvailable(uint64_t offset, uint32_t chunk_size) const
{
uint32_t chunk_number_start = offset/(uint64_t)_chunk_size ;

View File

@ -71,6 +71,7 @@ class ChunkDownloadInfo
public:
std::map<ftChunk::ChunkId,uint32_t> _slices ;
uint32_t _remains ;
time_t _last_data_received ;
};
class SourceChunksInfo
@ -139,6 +140,10 @@ class ChunkMap
/// This function is used by the parent ftFileProvider to know whether the chunk can be sent or not.
bool isChunkAvailable(uint64_t offset, uint32_t chunk_size) const ;
/// Remove active chunks that have not received any data for the last 60 seconds, and return
/// the list of slice numbers that should be canceled.
void removeInactiveChunks(std::vector<ftChunk::ChunkId>& to_remove) ;
/// Updates the peer's availablility map
//
void setPeerAvailabilityMap(const std::string& peer_id,const CompressedChunkMap& peer_map) ;

View File

@ -57,7 +57,8 @@
* #define CONTROL_DEBUG 1
*****/
static const uint32_t SAVE_TRANSFERS_DELAY = 30 ; // save transfer progress every 30 seconds.
static const uint32_t SAVE_TRANSFERS_DELAY = 61 ; // save transfer progress every 61 seconds.
static const uint32_t INACTIVE_CHUNKS_CHECK_DELAY = 60 ; // save transfer progress every 61 seconds.
ftFileControl::ftFileControl()
:mTransfer(NULL), mCreator(NULL),
@ -82,6 +83,7 @@ ftFileControl::ftFileControl(std::string fname,
ftController::ftController(CacheStrapper *cs, ftDataMultiplex *dm, std::string configDir)
:CacheTransfer(cs), p3Config(CONFIG_TYPE_FT_CONTROL),
last_save_time(0),
last_clean_time(0),
mDataplex(dm),
mTurtle(NULL),
mFtActive(false),
@ -195,6 +197,16 @@ void ftController::run()
last_save_time = now ;
}
if((int)now - (int)last_clean_time > (int)INACTIVE_CHUNKS_CHECK_DELAY)
{
RsStackMutex stack(ctrlMutex); /******* LOCKED ********/
for(std::map<std::string,ftFileControl>::iterator it(mDownloads.begin());it!=mDownloads.end();++it)
it->second.mCreator->removeInactiveChunks() ;
last_clean_time = now ;
}
if (doPending)
{
if (!handleAPendingRequest())

View File

@ -193,6 +193,7 @@ bool setPeerState(ftTransferModule *tm, std::string id,
uint32_t maxrate, bool online);
time_t last_save_time ;
time_t last_clean_time ;
/* pointers to other components */
ftSearch *mSearch;

View File

@ -135,6 +135,44 @@ bool ftFileCreator::addFileData(uint64_t offset, uint32_t chunk_size, void *data
return 1;
}
void ftFileCreator::removeInactiveChunks()
{
RsStackMutex stack(ftcMutex); /********** STACK LOCKED MTX ******/
#ifdef FILE_DEBUG
std::cerr << "ftFileCreator::removeInactiveChunks(): looking for old chunks." << std::endl ;
#endif
std::vector<ftChunk::ChunkId> to_remove ;
chunkMap.removeInactiveChunks(to_remove) ;
#ifdef FILE_DEBUG
if(!to_remove.empty())
std::cerr << "ftFileCreator::removeInactiveChunks(): removing slice ids: " ;
#endif
// This double loop looks costly, but it's called on very few chunks, and not often, so it's ok.
//
for(uint32_t i=0;i<to_remove.size();++i)
{
#ifdef FILE_DEBUG
std::cerr << to_remove[i] << " " ;
#endif
for(std::map<uint64_t,ftChunk>::iterator it(mChunks.begin());it!=mChunks.end();)
if(it->second.id == to_remove[i])
{
std::map<uint64_t,ftChunk>::iterator tmp(it) ;
++it ;
mChunks.erase(tmp) ;
}
else
++it ;
}
#ifdef FILE_DEBUG
if(!to_remove.empty())
std::cerr << std::endl ;
#endif
}
int ftFileCreator::initializeFileAttrs()
{
std::cerr << "ftFileCreator::initializeFileAttrs() Filename: ";

View File

@ -65,6 +65,11 @@ class ftFileCreator: public ftFileProvider
//
bool getMissingChunk(const std::string& peer_id,uint32_t size_hint,uint64_t& offset, uint32_t& size,bool& is_chunk_map_too_old);
// Takes care of purging any inactive chunks. This should be called regularly, because some peers may disconnect
// and let inactive chunks not finished.
//
void removeInactiveChunks() ;
// actually store data in the file, and update chunks info
//
bool addFileData(uint64_t offset, uint32_t chunk_size, void *data);