diff --git a/libretroshare/src/ft/ftchunkmap.cc b/libretroshare/src/ft/ftchunkmap.cc index cc8dda5c2..d3dcfa849 100644 --- a/libretroshare/src/ft/ftchunkmap.cc +++ b/libretroshare/src/ft/ftchunkmap.cc @@ -1,34 +1,251 @@ #include +#include +#include #include "ftchunkmap.h" -ChunkMap::ChunkMap(uint64_t s) +std::ostream& operator<<(std::ostream& o,const ftChunk& c) { - file_size = s ; + return o << "\tChunk [" << c.offset << "] size: " << c.size << " ChunkId: " << c.id << " Age: " << time(NULL) - c.ts ; +} - chunk_size = 1024*1024 ; // 1MB chunks - uint64_t n = s/chunk_size ; - if(s% (uint64_t)chunk_size != 0) +// Chunk: very bold implementation for now. We should compress the bits to have +// 32 of them per uint32_t value, of course! +// +Chunk::Chunk(uint64_t start,uint32_t size) + : _start(start),_offset(start),_end( (uint64_t)size + start ) +{ +} + +void Chunk::getSlice(uint32_t size_hint,ftChunk& chunk) +{ + // Take the current offset + chunk.offset = _offset ; + chunk.size = std::min(size_hint,(uint32_t)(_end-_offset)) ; + chunk.id = _offset ; + chunk.ts = time(NULL) ; + + // push the slice marker into currently handled slices. + _offset += chunk.size ; +} + +//uint32_t Chunk::dataReceived(const ftChunk::ChunkId cid) +//{ +//#ifdef DEBUG_FTCHUNK +// std::cerr << "*** Chunk::dataReceived: slice " << cid << " finished" << std::endl ; +//#endif +// std::map::iterator it( _slices_to_download.find(cid) ) ; +// +// if(it == _slices_to_download.end()) +// { +// std::cerr << "!!! Chunk::dataReceived: could not find chunk " << cid << ": probably a fatal error" << std::endl ; +// return 0 ; +// } +// else +// { +// uint32_t n = it->second ; +// _slices_to_download.erase(it) ; +// return n ; +// } +//} + +ChunkMap::ChunkMap(uint64_t s) + :_file_size(s),_chunk_size(1024*1024) // 1MB chunks +{ + uint64_t n = s/(uint64_t)_chunk_size ; + if(s% (uint64_t)_chunk_size != 0) ++n ; - chunks.resize(n,CHUNK_OUTSTANDING) ; + _map.resize(n,FileChunksInfo::CHUNK_OUTSTANDING) ; + _total_downloaded = 0 ; + _strategy = FileChunksInfo::CHUNK_STRATEGY_STREAMING ; +#ifdef DEBUG_FTCHUNK + std::cerr << "*** ChunkMap::ChunkMap: starting new chunkmap:" << std::endl ; + std::cerr << " File size: " << s << std::endl ; + std::cerr << " Strategy: " << _strategy << std::endl ; + std::cerr << " ChunkSize: " << _chunk_size << std::endl ; + std::cerr << " Number of Chunks: " << n << std::endl ; +#endif } -void ChunkMap::received(uint64_t offset) +void ChunkMap::dataReceived(const ftChunk::ChunkId& cid) { - uint64_t n = offset/chunk_size ; + // 1 - find which chunk contains the received data. + // - for(uint64_t i=0;i::iterator itc(_slices_to_download.find(n)) ; + + if(itc == _slices_to_download.end()) + { + std::cerr << "!!! ChunkMap::dataReceived: error: ChunkId " << cid << " corresponds to chunk number " << n << ", which is not being downloaded!" << std::endl ; + assert(false) ; + return ; + } + + std::map::iterator it(itc->second._slices.find(cid)) ; + + if(it == itc->second._slices.end()) + { + std::cerr << "!!! ChunkMap::dataReceived: chunk " << cid << " is not found in slice lst of chunk number " << n << std::endl ; + assert(false) ; + return ; + } + + _total_downloaded += it->second ; + itc->second._remains -= it->second ; + itc->second._slices.erase(it) ; + +#ifdef DEBUG_FTCHUNK + std::cerr << "*** ChunkMap::dataReceived: received data chunk " << cid << " for chunk number " << n << ", local remains=" << itc->second._remains << ", total downloaded=" << _total_downloaded << ", remains=" << _file_size - _total_downloaded << std::endl ; +#endif + if(itc->second._remains == 0) // the chunk was completely downloaded + { +#ifdef DEBUG_FTCHUNK + std::cerr << "*** ChunkMap::dataReceived: Chunk is complete. Removing it." << std::endl ; +#endif + _map[n] = FileChunksInfo::CHUNK_DONE ; + _slices_to_download.erase(itc) ; + } } -void ChunkMap::requested(uint64_t offset, uint32_t size) +// Warning: a chunk may be empty, but still being downloaded, so asking new slices from it +// will produce slices of size 0. This happens at the end of each chunk. +// --> I need to get slices from the next chunk, in such a case. +// --> solution: +// - have too chunks maps: +// 1 indexed by peer id to feed the getChunk method +// - chunks pushed when new chunks are needed +// - chunks removed when empty +// 1 indexed by chunk id to account for chunks being downloaded +// - chunks pushed when new chunks are needed +// - chunks removed when completely downloaded +// +bool ChunkMap::getDataChunk(const std::string& peer_id,uint32_t size_hint,ftChunk& chunk) { - int start = offset/chunk_size ; - int end = (int)ceil((offset+size)/chunk_size) ; +#ifdef DEBUG_FTCHUNK + std::cerr << "*** ChunkMap::getDataChunk: size_hint = " << size_hint << std::endl ; +#endif + // 1 - find if this peer already has an active chunk. + // + std::map::iterator it = _active_chunks_feed.find(peer_id) ; - for(int i=start;i<=end;++i) - chunks[i] = CHUNK_ACTIVE ; + if(it == _active_chunks_feed.end()) + { + // 1 - select an available chunk id for this peer. + // + uint32_t c ; + + switch(_strategy) + { + case FileChunksInfo::CHUNK_STRATEGY_STREAMING: c = getAvailableChunk(0,peer_id) ; // very bold!! + break ; + + case FileChunksInfo::CHUNK_STRATEGY_RANDOM: c = getAvailableChunk(rand()%_map.size(),peer_id) ; + break ; + default: +#ifdef DEBUG_FTCHUNK + std::cerr << "!!! ChunkMap::getDataChunk: error!: unknown strategy" << std::endl ; +#endif + return false ; + } + + if(c >= _map.size()) + return false ; + + // 2 - add the chunk in the list of active chunks, and mark it as being downloaded + // + uint32_t soc = sizeOfChunk(c) ; + _active_chunks_feed[peer_id] = Chunk( c*(uint64_t)_chunk_size, soc ) ; + _map[c] = FileChunksInfo::CHUNK_ACTIVE ; + _slices_to_download[c]._remains = soc ; // init the list of slices to download +#ifdef DEBUG_FTCHUNK + std::cout << "*** ChunkMap::getDataChunk: Allocating new chunk " << c << " for peer " << peer_id << std::endl ; +#endif + } +#ifdef DEBUG_FTCHUNK + else + std::cout << "*** ChunkMap::getDataChunk: Re-using chunk " << it->second._start/_chunk_size << " for peer " << peer_id << std::endl ; +#endif + + // Get the first slice of the chunk, that is at most of length size + // + _active_chunks_feed[peer_id].getSlice(size_hint,chunk) ; + _slices_to_download[chunk.offset/_chunk_size]._slices[chunk.id] = chunk.size ; + + if(_active_chunks_feed[peer_id].empty()) + _active_chunks_feed.erase(_active_chunks_feed.find(peer_id)) ; + +#ifdef DEBUG_FTCHUNK + std::cout << "*** ChunkMap::getDataChunk: returning slice " << chunk << " for peer " << peer_id << std::endl ; +#endif + return true ; } + +#ifdef A_FAIRE +void setPeerAvailabilityMap(const std::string& peer_id,const std::vector& peer_map) +{ +#ifdef DEBUG_FTCHUNK + std::cout << "Receiving new availability map for peer " << peer_id << std::endl ; +#endif + + _peers_chunks_availability[peer_id] = peer_map ; +} +#endif + +uint32_t ChunkMap::sizeOfChunk(uint32_t cid) const +{ + if(cid == _map.size()-1) + return _file_size - (_map.size()-1)*_chunk_size ; + else + return _chunk_size ; +} + +uint32_t ChunkMap::getAvailableChunk(uint32_t start_location,const std::string& peer_id) const +{ + // Very bold algorithm: checks for 1st availabe chunk for this peer starting + // from the given start location. +#ifdef A_FAIRE + std::map >::const_iterator it(_peers_chunks_availability.find(peer_id)) ; + + // Do we have records for this file source ? + // + if(it == _peers_chunks_availability.end()) + { +#ifdef DEBUG_FTCHUNK + std::cout << "No chunk map for peer " << peer_id << ": returning false" << endl ; +#endif + return _map.size() ; + } + const std::vector peer_chunks(it->second) ; +#endif + + for(uint i=0;i<_map.size();++i) + { + uint32_t j = (start_location+i)%_map.size() ; + + if(_map[j] == FileChunksInfo::CHUNK_OUTSTANDING /*&& peers_chunks[j] == CHUNK_DONE*/) + { +#ifdef DEBUG_FTCHUNK + std::cerr << "ChunkMap::getAvailableChunk: returning chunk " << j << " for peer " << peer_id << std::endl; +#endif + return j ; + } + } + +#ifdef DEBUG_FTCHUNK + std::cout << "!!! ChunkMap::getAvailableChunk: No available chunk from peer " << peer_id << ": returning false" << std::endl ; +#endif + return _map.size() ; +} + +void ChunkMap::getChunksInfo(FileChunksInfo& info) const +{ + info.file_size = _file_size ; + info.chunk_size = _chunk_size ; + info.chunks = _map ; +} + + + diff --git a/libretroshare/src/ft/ftchunkmap.h b/libretroshare/src/ft/ftchunkmap.h index 6b03ceab6..0732253cd 100644 --- a/libretroshare/src/ft/ftchunkmap.h +++ b/libretroshare/src/ft/ftchunkmap.h @@ -1,13 +1,146 @@ #pragma once +#include #include -class ChunkMap: public FileChunksInfo +// ftChunkMap: +// - handles chunk map over a complete file +// - mark down which chunk is being downloaded by which peer +// - allocate data ranges of any requested size for a given peer +// - continuing an existing chunk +// - allocating a new chunk +// +// Download mecanism: +// - ftFileCreator handles a list of active slices, and periodically resends requests every 20 sec. +// Slices have arbitrary size (less than a chunk), depending on the transfer rate. +// When receiving data, ftFileCreator shrinks its slices until they get complete. When a slice is finished, it +// notifies ftChunkMap that this slice is done. +// +// - ftChunkMap maintains two levels: +// - the chunk level (Chunks a 1MB long) with a map of who has which chunk and what locally is the state of +// each chunk +// - the slice level: each active chunk is cut into slices (basically a list of intervalls) being downloaded, and +// a remaining slice to cut off new candidates. When notified for a complete slice, ftChunkMap removed the +// corresponding acive slice. When asked a slice, ftChunkMap cuts out a slice from the remaining part of the chunk +// to download, sends the slice's coordinates and gives a unique slice id (such as the slice offset). + +// This class handles a slice of a chunk, at the level of ftFileCreator + +class ftChunk { public: - ChunkMap(uint64_t size) ; + typedef uint64_t ChunkId ; - virtual void received(uint64_t offset) ; - virtual void requested(uint64_t offset, uint32_t chunk_size) ; + ftChunk():offset(0), size(0), ts(0) {} + + friend std::ostream& operator<<(std::ostream& o,const ftChunk& f) ; + + uint64_t offset; + uint64_t size; + ChunkId id ; + time_t ts; }; +// This class handles a single chunk. Although each chunk is requested at once, +// it may be sent back into sub-chunks because of file transfer rate constraints. +// So the dataReceived function should be called to progressively complete the chunk, +// and the getChunk method should ask for a sub0chunk of a given size. +// +class Chunk +{ + public: + Chunk(): _start(0),_offset(0),_end(0) {} // only used in default std::map fillers + + Chunk(uint64_t start,uint32_t size) ; + + void getSlice(uint32_t size_hint,ftChunk& chunk) ; + + // Returns true when the chunk is complete + bool empty() const { return _offset == _end ; } + + // Array of intervalls of bytes to download. + // + uint64_t _start ; // const + uint64_t _offset ; // not const: handles the current offset within the chunk. + uint64_t _end ; // const +}; + +class ChunkDownloadInfo +{ + public: + std::map _slices ; + uint32_t _remains ; +}; + +class ChunkMap +{ + public: + typedef uint32_t ChunkNumber ; + + // Constructor. Decides what will be the size of chunks and how many there will be. + + ChunkMap(uint64_t file_size) ; + + // Returns an slice of data to be asked to the peer within a chunk. + // If a chunk is already been downloaded by this peer, take a slice at + // the beginning of this chunk, or at least where it starts. + // If not, randomly/streamly select a new chunk depending on the strategy. + // adds an entry in the chunk_ids map, and sets up 1 interval for it. + // the chunk should be available from the designated peer. + + virtual bool getDataChunk(const std::string& peer_id,uint32_t size_hint,ftChunk& chunk) ; + + // Notify received a slice of data. This needs to + // - carve in the map of chunks what is received, what is not. + // - tell which chunks are finished. For this, each interval must know what chunk number it has been attributed + // when the interval is split in the middle, the number of intervals for the chunk is increased. If the interval is + // completely covered by the data, the interval number is decreased. + + virtual void dataReceived(const ftChunk::ChunkId& c_id) ; + + // Decides how chunks are selected. + // STREAMING: the 1st chunk is always returned + // RANDOM: the beginning of a random interval is selected first. If two few intervals + // exist, the largest one is randomly split into two. + + void setStrategy(FileChunksInfo::ChunkStrategy s) { _strategy = s ; } + +#ifdef TO_DO + // Properly fills an vector of fixed size chunks with availability or download state. + // chunks is given with the proper number of chunks and we have to adapt to it. This can be used + // to display square chunks in the gui or display a blue bar of availability by collapsing info from all peers. + + void linearize(FileChunksInfo& info) const ; + + // Updates the peer's availablility map + // + void setPeerAvailabilityMap(const std::string& peer_id,const std::vector& peer_map) ; +#endif + + // Returns the total size of downloaded data in the file. + uint64_t getTotalReceived() const { return _total_downloaded ; } + + void getChunksInfo(FileChunksInfo& info) const ; + protected: + // handles what size the last chunk has. + uint32_t sizeOfChunk(uint32_t chunk_number) const ; + + // Returns the first chunk available starting from start_location for this peer_id. + // + uint32_t getAvailableChunk(uint32_t start_location,const std::string& peer_id) const ; + + private: + const uint64_t _file_size ; // total size of the file in bytes. + const uint32_t _chunk_size ; // Size of chunks. Common to all chunks. + FileChunksInfo::ChunkStrategy _strategy ; // how do we allocate new chunks + std::map _active_chunks_feed ; // vector of chunks being downloaded. Exactly one chunk per peer id. + std::map _slices_to_download ; // list of (slice id,slice size) + + std::vector _map ; // vector of chunk state over the whole file + + std::map > _peers_chunks_availability ; // what does each source peer have. + + uint64_t _total_downloaded ; +}; + + diff --git a/libretroshare/src/ft/ftcontroller.cc b/libretroshare/src/ft/ftcontroller.cc index 32ad7d06f..c136c6ca5 100644 --- a/libretroshare/src/ft/ftcontroller.cc +++ b/libretroshare/src/ft/ftcontroller.cc @@ -842,6 +842,21 @@ bool ftController::setPeerState(ftTransferModule *tm, std::string id, } +bool ftController::setChunkStrategy(const std::string& hash,FileChunksInfo::ChunkStrategy s) +{ + std::map::iterator mit; + mit=mDownloads.find(hash); + if (mit==mDownloads.end()) + { +#ifdef CONTROL_DEBUG + std::cerr<<"ftController::FileCancel file is not found in mDownloads"<second.mCreator->setChunkStrategy(s) ; + return true ; +} bool ftController::FileCancel(std::string hash) { diff --git a/libretroshare/src/ft/ftcontroller.h b/libretroshare/src/ft/ftcontroller.h index 553413a1e..f597bed15 100644 --- a/libretroshare/src/ft/ftcontroller.h +++ b/libretroshare/src/ft/ftcontroller.h @@ -132,6 +132,8 @@ bool FileRequest(std::string fname, std::string hash, uint64_t size, std::string dest, uint32_t flags, std::list &sourceIds); +bool setChunkStrategy(const std::string& hash,FileChunksInfo::ChunkStrategy s); + bool FileCancel(std::string hash); bool FileControl(std::string hash, uint32_t flags); bool FileClearCompleted(); diff --git a/libretroshare/src/ft/ftfilecreator.cc b/libretroshare/src/ft/ftfilecreator.cc index a03a0b8e9..d7237b715 100644 --- a/libretroshare/src/ft/ftfilecreator.cc +++ b/libretroshare/src/ft/ftfilecreator.cc @@ -15,8 +15,8 @@ * ***********************************************************/ -ftFileCreator::ftFileCreator(std::string path, uint64_t size, std::string -hash, uint64_t recvd): ftFileProvider(path,size,hash), chunkMap(size) +ftFileCreator::ftFileCreator(std::string path, uint64_t size, std::string hash, uint64_t recvd) + : ftFileProvider(path,size,hash), chunkMap(size) { /* * FIXME any inits to do? @@ -36,23 +36,25 @@ hash, uint64_t recvd): ftFileProvider(path,size,hash), chunkMap(size) RsStackMutex stack(ftcMutex); /********** STACK LOCKED MTX ******/ /* initialise the Transfer Lists */ - mStart = recvd; - mEnd = recvd; +// mStart = recvd; +// mEnd = recvd; - chunkMap.received(recvd) ; +#ifdef TO_DO + // we should init the chunk map with some bit array saying what is received and what is not!! + chunkMap.setTotalReceived(recvd) ; +#endif } -bool ftFileCreator::getFileData(uint64_t offset, - uint32_t &chunk_size, void *data) +bool ftFileCreator::getFileData(uint64_t offset, uint32_t &chunk_size, void *data) { { - RsStackMutex stack(ftcMutex); /********** STACK LOCKED MTX ******/ - if (offset + chunk_size > mStart) - { - /* don't have the data */ - return false; - } - } + RsStackMutex stack(ftcMutex); /********** STACK LOCKED MTX ******/ + if (offset + chunk_size > mStart) + { + /* don't have the data */ + return false; + } + } return ftFileProvider::getFileData(offset, chunk_size, data); } @@ -60,7 +62,7 @@ bool ftFileCreator::getFileData(uint64_t offset, uint64_t ftFileCreator::getRecvd() { RsStackMutex stack(ftcMutex); /********** STACK LOCKED MTX ******/ - return mStart; + return chunkMap.getTotalReceived() ; } bool ftFileCreator::addFileData(uint64_t offset, uint32_t chunk_size, void *data) @@ -206,8 +208,8 @@ int ftFileCreator::initializeFileAttrs() std::cerr << "ftFileCreator::initializeFileAttrs() File Expected Size: " << mSize << " RecvdSize: " << recvdsize << std::endl; /* start from there! */ - mStart = recvdsize; - mEnd = recvdsize; +// mStart = recvdsize; +// mEnd = recvdsize; return 1; } @@ -230,9 +232,9 @@ int ftFileCreator::locked_notifyReceived(uint64_t offset, uint32_t chunk_size) #endif /* find the chunk */ - std::map::iterator it; - it = mChunks.find(offset); - bool isFirst = false; + std::map::iterator it = mChunks.find(offset); + +// bool isFirst = false; if (it == mChunks.end()) { #ifdef FILE_DEBUG @@ -244,35 +246,37 @@ int ftFileCreator::locked_notifyReceived(uint64_t offset, uint32_t chunk_size) #endif return 0; /* ignoring */ } - else if (it == mChunks.begin()) - { - isFirst = true; - } + +// if (it == mChunks.begin()) +// { +// isFirst = true; +// } ftChunk chunk = it->second; mChunks.erase(it); - if (chunk.chunk != chunk_size) + if (chunk.size != chunk_size) { /* partial : shrink chunk */ - chunk.chunk -= chunk_size; + chunk.size -= chunk_size; chunk.offset += chunk_size; mChunks[chunk.offset] = chunk; } + else // notify the chunkmap that the slice is finished + chunkMap.dataReceived(chunk.id) ; - /* update how much has been completed */ - if (isFirst) - { - mStart = offset + chunk_size; - } +// /* update how much has been completed */ +// if (isFirst) +// { +// mStart = offset + chunk_size; +// } // update chunk map - chunkMap.received(mStart) ; - if (mChunks.size() == 0) - { - mStart = mEnd; - } +// if (mChunks.size() == 0) +// { +// mStart = mEnd; +// } /* otherwise there is another earlier block to go */ @@ -280,15 +284,25 @@ int ftFileCreator::locked_notifyReceived(uint64_t offset, uint32_t chunk_size) return 1; } +void ftFileCreator::setChunkStrategy(FileChunksInfo::ChunkStrategy s) +{ + RsStackMutex stack(ftcMutex); /********** STACK LOCKED MTX ******/ + +#ifdef FILE_DEBUG + std::cerr << "ftFileCtreator: setting chunk strategy to " << s << std::endl ; +#endif + chunkMap.setStrategy(s) ; +} + /* Returns true if more to get * But can return size = 0, if we are still waiting for the data. */ -bool ftFileCreator::getMissingChunk(uint64_t &offset, uint32_t &chunk) +bool ftFileCreator::getMissingChunk(const std::string& peer_id,uint32_t size_hint,uint64_t &offset, uint32_t& size) { RsStackMutex stack(ftcMutex); /********** STACK LOCKED MTX ******/ #ifdef FILE_DEBUG - std::cerr << "ffc::getMissingChunk(...,"<< chunk << ")"; + std::cerr << "ffc::getMissingChunk(...,"<< size_hint << ")"; std::cerr << " this: " << this; std::cerr << std::endl; locked_printChunkMap(); @@ -296,14 +310,14 @@ bool ftFileCreator::getMissingChunk(uint64_t &offset, uint32_t &chunk) /* check start point */ - if (mStart == mSize) - { -#ifdef FILE_DEBUG - std::cerr << "ffc::getMissingChunk() File Done"; - std::cerr << std::endl; -#endif - return false; - } +// if(mStart == mSize) +// { +//#ifdef FILE_DEBUG +// std::cerr << "ffc::getMissingChunk() File Done"; +// std::cerr << std::endl; +//#endif +// return false; +// } /* check for freed chunks */ time_t ts = time(NULL); @@ -316,48 +330,47 @@ bool ftFileCreator::getMissingChunk(uint64_t &offset, uint32_t &chunk) if (it->second.ts < old) { #ifdef FILE_DEBUG - std::cerr << "ffc::getMissingChunk() ReAlloc"; + std::cerr << "ffc::getMissingChunk() Re-asking for an old chunk"; std::cerr << std::endl; #endif /* retry this one */ it->second.ts = ts; - chunk = it->second.chunk; + size = it->second.size; offset = it->second.offset; return true; } } -#ifdef FILE_DEBUG - std::cerr << "ffc::getMissingChunk() new Alloc"; - std::cerr << " mStart: " << mStart << " mEnd: " << mEnd; - std::cerr << "mSize: " << mSize; - std::cerr << std::endl; -#endif - /* else allocate a new chunk */ - if (mSize - mEnd < chunk) - chunk = mSize - mEnd; - offset = mEnd; - mEnd += chunk; + ftChunk chunk ; - if (chunk > 0) - { + if(!chunkMap.getDataChunk(peer_id,size_hint,chunk)) + return false ; + +// if (mSize - mEnd < chunk) +// chunk = mSize - mEnd; +// +// offset = mEnd; +// mEnd += chunk; + +// if (chunk > 0) +// { #ifdef FILE_DEBUG - std::cerr << "ffc::getMissingChunk() Allocated " << chunk; - std::cerr << " offset: " << offset; - std::cerr << std::endl; - std::cerr << " mStart: " << mStart << " mEnd: " << mEnd; - std::cerr << "mSize: " << mSize; - std::cerr << std::endl; + std::cerr << "ffc::getMissingChunk() Retrieved new chunk: " << chunk << std::endl ; +// std::cerr << std::endl; +// std::cerr << " mStart: " << mStart << " mEnd: " << mEnd; +// std::cerr << "mSize: " << mSize; +// std::cerr << std::endl; #endif - mChunks[offset] = ftChunk(offset, chunk, ts); + mChunks[chunk.offset] = chunk ; - chunkMap.requested(offset,chunk) ; - } + offset = chunk.offset ; + size = chunk.size ; +// } return true; /* cos more data to get */ } @@ -365,7 +378,8 @@ bool ftFileCreator::getMissingChunk(uint64_t &offset, uint32_t &chunk) void ftFileCreator::getChunkMap(FileChunksInfo& info) { RsStackMutex stack(ftcMutex); /********** STACK LOCKED MTX ******/ - info = chunkMap ; + + chunkMap.getChunksInfo(info) ; } bool ftFileCreator::locked_printChunkMap() @@ -377,37 +391,16 @@ bool ftFileCreator::locked_printChunkMap() #endif /* check start point */ - std::cerr << "Size: " << mSize << " Start: " << mStart << " End: " << mEnd; - std::cerr << std::endl; - std::cerr << "\tOutstanding Chunks (in the middle)"; +// std::cerr << "Size: " << mSize << " Start: " << mStart << " End: " << mEnd; + std::cerr << "\tOutstanding Chunks:"; std::cerr << std::endl; std::map::iterator it; - time_t ts = time(NULL); + for(it = mChunks.begin(); it != mChunks.end(); it++) - { - std::cerr << "\tChunk [" << it->second.offset << "] size: "; - std::cerr << it->second.chunk; - std::cerr << " Age: " << ts - it->second.ts; - std::cerr << std::endl; - } + std::cerr << " " << it->second << std::endl ; + return true; } -/*********************************************************** -* -* ftChunk methods -* -***********************************************************/ - -ftChunk::ftChunk(uint64_t ioffset,uint64_t size,time_t now) - : offset(ioffset), chunk(size), ts(now) -{ - -} - -ftChunk::~ftChunk() -{ - -} diff --git a/libretroshare/src/ft/ftfilecreator.h b/libretroshare/src/ft/ftfilecreator.h index 2b990172a..91b14c19b 100644 --- a/libretroshare/src/ft/ftfilecreator.h +++ b/libretroshare/src/ft/ftfilecreator.h @@ -35,7 +35,6 @@ #include "ftfileprovider.h" #include "ftchunkmap.h" #include -class ftChunk; class ftFileCreator: public ftFileProvider { @@ -51,12 +50,13 @@ virtual bool getFileData(uint64_t offset, uint32_t &chunk_size, void *data); uint64_t getRecvd(); void getChunkMap(FileChunksInfo& info) ; + void setChunkStrategy(FileChunksInfo::ChunkStrategy s) ; /* * creation functions for FileCreator */ - bool getMissingChunk(uint64_t &offset, uint32_t &chunk); + bool getMissingChunk(const std::string& peer_id,uint32_t size_hint,uint64_t& offset, uint32_t& size); bool addFileData(uint64_t offset, uint32_t chunk_size, void *data); protected: @@ -79,15 +79,4 @@ private: ChunkMap chunkMap ; }; -class ftChunk { -public: - ftChunk(uint64_t,uint64_t,time_t); - ftChunk():offset(0), chunk(0), ts(0) {} - ~ftChunk(); - - uint64_t offset; - uint64_t chunk; - time_t ts; -}; - #endif // FT_FILE_CREATOR_HEADER diff --git a/libretroshare/src/ft/ftserver.cc b/libretroshare/src/ft/ftserver.cc index c46a0f709..579fce9a0 100644 --- a/libretroshare/src/ft/ftserver.cc +++ b/libretroshare/src/ft/ftserver.cc @@ -256,6 +256,11 @@ bool ftServer::FileRequest(std::string fname, std::string hash, uint64_t size, return true ; } +bool ftServer::setChunkStrategy(const std::string& hash,FileChunksInfo::ChunkStrategy s) +{ + return mFtController->setChunkStrategy(hash,s); +} + bool ftServer::FileCancel(std::string hash) { rsTurtle->stopMonitoringFileTunnels(hash) ; diff --git a/libretroshare/src/ft/ftserver.h b/libretroshare/src/ft/ftserver.h index 68f9c5885..50380b502 100644 --- a/libretroshare/src/ft/ftserver.h +++ b/libretroshare/src/ft/ftserver.h @@ -121,6 +121,7 @@ virtual bool FileRequest(std::string fname, std::string hash, uint64_t size, virtual bool FileCancel(std::string hash); virtual bool FileControl(std::string hash, uint32_t flags); virtual bool FileClearCompleted(); +virtual bool setChunkStrategy(const std::string& hash,FileChunksInfo::ChunkStrategy s) ; /*** * Control of Downloads Priority. diff --git a/libretroshare/src/ft/fttransfermodule.cc b/libretroshare/src/ft/fttransfermodule.cc index 00b53b09b..43cfe107b 100644 --- a/libretroshare/src/ft/fttransfermodule.cc +++ b/libretroshare/src/ft/fttransfermodule.cc @@ -261,8 +261,7 @@ uint32_t ftTransferModule::getDataRate(std::string peerId) //interface to client module -bool ftTransferModule::recvFileData(std::string peerId, uint64_t offset, - uint32_t chunk_size, void *data) +bool ftTransferModule::recvFileData(std::string peerId, uint64_t offset, uint32_t chunk_size, void *data) { #ifdef FT_DEBUG std::cerr << "ftTransferModule::recvFileData()"; @@ -316,18 +315,19 @@ void ftTransferModule::requestData(std::string peerId, uint64_t offset, uint32_t mMultiplexor->sendDataRequest(peerId, mHash, mSize, offset,chunk_size); } -bool ftTransferModule::getChunk(uint64_t &offset, uint32_t &chunk_size) +bool ftTransferModule::getChunk(const std::string& peer_id,uint32_t size_hint,uint64_t &offset, uint32_t &chunk_size) { #ifdef FT_DEBUG std::cerr << "ftTransferModule::getChunk()"; std::cerr << " hash: " << mHash; std::cerr << " size: " << mSize; std::cerr << " offset: " << offset; + std::cerr << " size_hint: " << size_hint; std::cerr << " chunk_size: " << chunk_size; std::cerr << std::endl; #endif - bool val = mFileCreator->getMissingChunk(offset, chunk_size); + bool val = mFileCreator->getMissingChunk(peer_id,size_hint,offset, chunk_size); #ifdef FT_DEBUG if (val) @@ -595,7 +595,9 @@ bool ftTransferModule::locked_tickPeerTransfer(peerInfo &info) info.state = PQIPEER_IDLE; return false; } - +#ifdef FT_DEBUG + std::cerr << "locked_tickPeerTransfer() actual rate (before): " << info.actualRate << ", lastTransfers=" << info.lastTransfers << std::endl ; +#endif /* update rate */ info.actualRate = info.actualRate * 0.75 + 0.25 * info.lastTransfers; info.lastTransfers = 0; @@ -617,15 +619,23 @@ bool ftTransferModule::locked_tickPeerTransfer(peerInfo &info) { if (info.mRateIncrease > 0) { +#ifdef FT_DEBUG + std::cerr << "!!! - Emergency shutdown because rttActive is true, and age is " << ts - info.rttStart << std::endl ; +#endif info.mRateIncrease = 0; + info.rttActive = false ; // I've added this to avoid being stuck when rttActive is true } } /* request at more than current rate */ uint32_t next_req = info.actualRate * (1.0 + info.mRateIncrease); #ifdef FT_DEBUG + std::cerr << "locked_tickPeerTransfer() actual rate (after): " << actualRate + << " increase factor=" << 1.0 + info.mRateIncrease + << " info.desiredRate=" << info.desiredRate + << " info.actualRate=" << info.actualRate + << ", next_req=" << next_req ; - std::cerr << "locked_tickPeerTransfer() actual rate: " << actualRate; std::cerr << std::endl; #endif @@ -667,19 +677,21 @@ bool ftTransferModule::locked_tickPeerTransfer(peerInfo &info) /* do request */ uint64_t req_offset = 0; - if (getChunk(req_offset,next_req)) + uint32_t req_size =0 ; + + if (getChunk(info.peerId,next_req,req_offset,req_size)) { - if (next_req > 0) + if (req_size > 0) { info.state = PQIPEER_DOWNLOADING; - requestData(info.peerId,req_offset,next_req); + requestData(info.peerId,req_offset,req_size); /* start next rtt measurement */ if (!info.rttActive) { info.rttStart = ts; info.rttActive = true; - info.rttOffset = req_offset + next_req; + info.rttOffset = req_offset + req_size; } } else @@ -688,9 +700,8 @@ bool ftTransferModule::locked_tickPeerTransfer(peerInfo &info) std::cerr << std::endl; } } - else mFlag = 1; - - return true ; + else + mFlag = 1; return true; } @@ -704,6 +715,8 @@ bool ftTransferModule::locked_recvPeerData(peerInfo &info, uint64_t offset, #ifdef FT_DEBUG std::cerr << "ftTransferModule::locked_recvPeerData()"; std::cerr << " peerId: " << info.peerId; + std::cerr << " rttOffset: " << info.rttOffset; + std::cerr << " lastTransfers: " << info.lastTransfers; std::cerr << " offset: " << offset; std::cerr << " chunksize: " << chunk_size; std::cerr << " data: " << data; @@ -718,36 +731,36 @@ bool ftTransferModule::locked_recvPeerData(peerInfo &info, uint64_t offset, if ((info.rttActive) && (info.rttOffset == offset + chunk_size)) { - /* update tip */ - int32_t rtt = time(NULL) - info.rttStart; + /* update tip */ + int32_t rtt = time(NULL) - info.rttStart; - /* - * FT_TM_FAST_RTT = 1 sec. mRateIncrease = 1.00 - * FT_TM_SLOW_RTT = 9 sec. mRateIncrease = 0 - * 11 sec. mRateIncrease = -0.25 - * if it is slower than this allow fast data increase. - * initial guess - linear with rtt. - * change if this leads to wild oscillations - * - */ + /* + * FT_TM_FAST_RTT = 1 sec. mRateIncrease = 1.00 + * FT_TM_SLOW_RTT = 9 sec. mRateIncrease = 0 + * 11 sec. mRateIncrease = -0.25 + * if it is slower than this allow fast data increase. + * initial guess - linear with rtt. + * change if this leads to wild oscillations + * + */ - info.mRateIncrease = (FT_TM_SLOW_RTT - rtt) * - (FT_TM_MAX_INCREASE / (FT_TM_SLOW_RTT - FT_TM_FAST_RTT)); + info.mRateIncrease = (FT_TM_SLOW_RTT - rtt) * + (FT_TM_MAX_INCREASE / (FT_TM_SLOW_RTT - FT_TM_FAST_RTT)); - if (info.mRateIncrease > FT_TM_MAX_INCREASE) - info.mRateIncrease = FT_TM_MAX_INCREASE; + if (info.mRateIncrease > FT_TM_MAX_INCREASE) + info.mRateIncrease = FT_TM_MAX_INCREASE; - if (info.mRateIncrease < FT_TM_MIN_INCREASE) - info.mRateIncrease = FT_TM_MIN_INCREASE; + if (info.mRateIncrease < FT_TM_MIN_INCREASE) + info.mRateIncrease = FT_TM_MIN_INCREASE; - info.rtt = rtt; - info.rttActive = false; + info.rtt = rtt; + info.rttActive = false; #ifdef FT_DEBUG - std::cerr << "ftTransferModule::locked_recvPeerData()"; - std::cerr << "Updated Rate based on RTT: " << rtt; - std::cerr << " Rate: " << info.mRateIncrease; - std::cerr << std::endl; + std::cerr << "ftTransferModule::locked_recvPeerData()"; + std::cerr << "Updated Rate based on RTT: " << rtt; + std::cerr << " Rate: " << info.mRateIncrease; + std::cerr << std::endl; #endif } diff --git a/libretroshare/src/ft/fttransfermodule.h b/libretroshare/src/ft/fttransfermodule.h index a75b68b11..3d6015308 100644 --- a/libretroshare/src/ft/fttransfermodule.h +++ b/libretroshare/src/ft/fttransfermodule.h @@ -55,7 +55,8 @@ class peerInfo { public: peerInfo(std::string peerId_in):peerId(peerId_in),state(PQIPEER_NOT_ONLINE),desiredRate(0),actualRate(0), - offset(0),chunkSize(0),receivedSize(0),lastTS(0), +// offset(0),chunkSize(0),receivedSize(0), + lastTS(0), recvTS(0), lastTransfers(0), nResets(0), rtt(0), rttActive(false), rttStart(0), rttOffset(0), mRateIncrease(1) @@ -64,7 +65,8 @@ public: } peerInfo(std::string peerId_in,uint32_t state_in,uint32_t maxRate_in): peerId(peerId_in),state(state_in),desiredRate(maxRate_in),actualRate(0), - offset(0),chunkSize(0),receivedSize(0),lastTS(0), +// offset(0),chunkSize(0),receivedSize(0), + lastTS(0), recvTS(0), lastTransfers(0), nResets(0), rtt(0), rttActive(false), rttStart(0), rttOffset(0), mRateIncrease(1) @@ -77,11 +79,11 @@ public: double actualRate; //current file data request - uint64_t offset; - uint32_t chunkSize; +// uint64_t offset; +// uint32_t chunkSize; //already received data size for current request - uint32_t receivedSize; +// uint32_t receivedSize; time_t lastTS; /* last Request */ time_t recvTS; /* last Recv */ @@ -147,7 +149,7 @@ public: void requestData(std::string peerId, uint64_t offset, uint32_t chunk_size); //interface to file creator - bool getChunk(uint64_t &offset, uint32_t &chunk_size); + bool getChunk(const std::string& peer_id,uint32_t size_hint,uint64_t &offset, uint32_t &chunk_size); bool storeData(uint64_t offset, uint32_t chunk_size, void *data); int tick(); diff --git a/libretroshare/src/rsiface/rsfiles.h b/libretroshare/src/rsiface/rsfiles.h index 28d0aedbe..5943d6846 100644 --- a/libretroshare/src/rsiface/rsfiles.h +++ b/libretroshare/src/rsiface/rsfiles.h @@ -110,6 +110,7 @@ virtual ~RsFiles() { return; } virtual bool FileRequest(std::string fname, std::string hash, uint64_t size, std::string dest, uint32_t flags, std::list srcIds) = 0; virtual bool FileCancel(std::string hash) = 0; +virtual bool setChunkStrategy(const std::string& hash,FileChunksInfo::ChunkStrategy) = 0; virtual bool FileControl(std::string hash, uint32_t flags) = 0; virtual bool FileClearCompleted() = 0; diff --git a/libretroshare/src/rsiface/rstypes.h b/libretroshare/src/rsiface/rstypes.h index 9b8afae01..466e5ea7f 100644 --- a/libretroshare/src/rsiface/rstypes.h +++ b/libretroshare/src/rsiface/rstypes.h @@ -252,6 +252,7 @@ class FileChunksInfo { public: enum ChunkState { CHUNK_DONE, CHUNK_ACTIVE, CHUNK_OUTSTANDING } ; + enum ChunkStrategy { CHUNK_STRATEGY_STREAMING, CHUNK_STRATEGY_RANDOM } ; uint64_t file_size ; // real size of the file uint32_t chunk_size ; // size of chunks diff --git a/retroshare-gui/src/gui/TransfersDialog.cpp b/retroshare-gui/src/gui/TransfersDialog.cpp index 8a170b136..f548ff01d 100644 --- a/retroshare-gui/src/gui/TransfersDialog.cpp +++ b/retroshare-gui/src/gui/TransfersDialog.cpp @@ -303,6 +303,16 @@ void TransfersDialog::downloadListCostumPopupMenu( QPoint point ) priorityMenu->addAction(priorityHighAct); priorityMenu->addAction(priorityAutoAct); + chunkStreamingAct = new QAction(QIcon(IMAGE_PRIORITYAUTO), tr("Streaming"), this); + connect(chunkStreamingAct, SIGNAL(triggered()), this, SLOT(chunkStreaming())); + chunkRandomAct = new QAction(QIcon(IMAGE_PRIORITYAUTO), tr("Random"), this); + connect(chunkRandomAct, SIGNAL(triggered()), this, SLOT(chunkRandom())); + + QMenu *chunkMenu = new QMenu(tr("Chunk strategy"), this); + chunkMenu->setIcon(QIcon(IMAGE_PRIORITY)); + chunkMenu->addAction(chunkStreamingAct); + chunkMenu->addAction(chunkRandomAct); + contextMnu.clear(); if (addPlayOption) { @@ -310,6 +320,7 @@ void TransfersDialog::downloadListCostumPopupMenu( QPoint point ) } contextMnu.addSeparator(); contextMnu.addMenu( priorityMenu); + contextMnu.addMenu( chunkMenu); contextMnu.addAction( pauseAct); contextMnu.addAction( resumeAct); contextMnu.addAction( cancelAct); @@ -1252,6 +1263,25 @@ void TransfersDialog::clearQueue() rsFiles->clearQueue(); } +void TransfersDialog::chunkStreaming() +{ + setChunkStrategy(FileChunksInfo::CHUNK_STRATEGY_STREAMING) ; +} +void TransfersDialog::chunkRandom() +{ + setChunkStrategy(FileChunksInfo::CHUNK_STRATEGY_RANDOM) ; +} +void TransfersDialog::setChunkStrategy(FileChunksInfo::ChunkStrategy s) +{ + QList items; + QList::iterator it; + getIdOfSelectedItems(items); + + for (it = items.begin(); it != items.end(); it ++) { + std::string hash = (*it)->data(Qt::DisplayRole).toString().toStdString(); + rsFiles->setChunkStrategy(hash, s); + } +} /* modify download priority actions */ void TransfersDialog::priorityLow() { diff --git a/retroshare-gui/src/gui/TransfersDialog.h b/retroshare-gui/src/gui/TransfersDialog.h index dd0aa3315..e71488793 100644 --- a/retroshare-gui/src/gui/TransfersDialog.h +++ b/retroshare-gui/src/gui/TransfersDialog.h @@ -30,6 +30,7 @@ #include #include +#include #include "mainpage.h" #include "RsAutoUpdatePage.h" @@ -90,6 +91,9 @@ class TransfersDialog : public RsAutoUpdatePage void priorityHigh(); void priorityAuto(); + void chunkRandom(); + void chunkStreaming(); + /** save sort indicators for next transfers display */ void saveSortIndicatorDwl(int logicalIndex, Qt::SortOrder order); void saveSortIndicatorUpl(int logicalIndex, Qt::SortOrder order); @@ -143,10 +147,13 @@ class TransfersDialog : public RsAutoUpdatePage QAction *priorityNormalAct; QAction *priorityHighAct; QAction *priorityAutoAct; + QAction *chunkRandomAct; + QAction *chunkStreamingAct; void getIdOfSelectedItems(QList& items); bool controlTransferFile(uint32_t flags); void changePriority(int priority); + void setChunkStrategy(FileChunksInfo::ChunkStrategy s) ; QTreeView *downloadList; diff --git a/retroshare-gui/src/rsiface/rsfiles.h b/retroshare-gui/src/rsiface/rsfiles.h index 28d0aedbe..5943d6846 100644 --- a/retroshare-gui/src/rsiface/rsfiles.h +++ b/retroshare-gui/src/rsiface/rsfiles.h @@ -110,6 +110,7 @@ virtual ~RsFiles() { return; } virtual bool FileRequest(std::string fname, std::string hash, uint64_t size, std::string dest, uint32_t flags, std::list srcIds) = 0; virtual bool FileCancel(std::string hash) = 0; +virtual bool setChunkStrategy(const std::string& hash,FileChunksInfo::ChunkStrategy) = 0; virtual bool FileControl(std::string hash, uint32_t flags) = 0; virtual bool FileClearCompleted() = 0; diff --git a/retroshare-gui/src/rsiface/rstypes.h b/retroshare-gui/src/rsiface/rstypes.h index 9b8afae01..466e5ea7f 100644 --- a/retroshare-gui/src/rsiface/rstypes.h +++ b/retroshare-gui/src/rsiface/rstypes.h @@ -252,6 +252,7 @@ class FileChunksInfo { public: enum ChunkState { CHUNK_DONE, CHUNK_ACTIVE, CHUNK_OUTSTANDING } ; + enum ChunkStrategy { CHUNK_STRATEGY_STREAMING, CHUNK_STRATEGY_RANDOM } ; uint64_t file_size ; // real size of the file uint32_t chunk_size ; // size of chunks