- Added a ChunkMap class responsible for allocating new chunks to be downloaded, according to

- a given chunk strategy
	- the availablility map of each source 
- Integrated this into ftFileCreator
- added gui menu in file transfer+right click to change the chunk strategy: streaming vs. random

Next step: 
	- loading/saving file downloading state and availability map
	- displaying chunk details in the selected transfer tab (e.g. list of currently worked chunks, and their current downloading completion)



git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@1863 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
csoler 2009-12-08 22:29:52 +00:00
parent de0cbd50ce
commit 25a09900e9
16 changed files with 584 additions and 173 deletions

View File

@ -1,34 +1,251 @@
#include <math.h>
#include <assert.h>
#include <stdlib.h>
#include "ftchunkmap.h"
ChunkMap::ChunkMap(uint64_t s)
std::ostream& operator<<(std::ostream& o,const ftChunk& c)
{
file_size = s ;
return o << "\tChunk [" << c.offset << "] size: " << c.size << " ChunkId: " << c.id << " Age: " << time(NULL) - c.ts ;
}
chunk_size = 1024*1024 ; // 1MB chunks
uint64_t n = s/chunk_size ;
if(s% (uint64_t)chunk_size != 0)
// Chunk: very bold implementation for now. We should compress the bits to have
// 32 of them per uint32_t value, of course!
//
Chunk::Chunk(uint64_t start,uint32_t size)
: _start(start),_offset(start),_end( (uint64_t)size + start )
{
}
void Chunk::getSlice(uint32_t size_hint,ftChunk& chunk)
{
// Take the current offset
chunk.offset = _offset ;
chunk.size = std::min(size_hint,(uint32_t)(_end-_offset)) ;
chunk.id = _offset ;
chunk.ts = time(NULL) ;
// push the slice marker into currently handled slices.
_offset += chunk.size ;
}
//uint32_t Chunk::dataReceived(const ftChunk::ChunkId cid)
//{
//#ifdef DEBUG_FTCHUNK
// std::cerr << "*** Chunk::dataReceived: slice " << cid << " finished" << std::endl ;
//#endif
// std::map<ftChunk::ChunkId,uint32_t>::iterator it( _slices_to_download.find(cid) ) ;
//
// if(it == _slices_to_download.end())
// {
// std::cerr << "!!! Chunk::dataReceived: could not find chunk " << cid << ": probably a fatal error" << std::endl ;
// return 0 ;
// }
// else
// {
// uint32_t n = it->second ;
// _slices_to_download.erase(it) ;
// return n ;
// }
//}
ChunkMap::ChunkMap(uint64_t s)
:_file_size(s),_chunk_size(1024*1024) // 1MB chunks
{
uint64_t n = s/(uint64_t)_chunk_size ;
if(s% (uint64_t)_chunk_size != 0)
++n ;
chunks.resize(n,CHUNK_OUTSTANDING) ;
_map.resize(n,FileChunksInfo::CHUNK_OUTSTANDING) ;
_total_downloaded = 0 ;
_strategy = FileChunksInfo::CHUNK_STRATEGY_STREAMING ;
#ifdef DEBUG_FTCHUNK
std::cerr << "*** ChunkMap::ChunkMap: starting new chunkmap:" << std::endl ;
std::cerr << " File size: " << s << std::endl ;
std::cerr << " Strategy: " << _strategy << std::endl ;
std::cerr << " ChunkSize: " << _chunk_size << std::endl ;
std::cerr << " Number of Chunks: " << n << std::endl ;
#endif
}
void ChunkMap::received(uint64_t offset)
void ChunkMap::dataReceived(const ftChunk::ChunkId& cid)
{
uint64_t n = offset/chunk_size ;
// 1 - find which chunk contains the received data.
//
for(uint64_t i=0;i<n;++i)
chunks[i] = CHUNK_DONE ;
// trick: cid is the chunk offset. So we use it to get the chunk number.
int n = (uint64_t)cid/_chunk_size ;
if(offset == file_size)
chunks.back() = CHUNK_DONE ;
std::map<ChunkNumber,ChunkDownloadInfo>::iterator itc(_slices_to_download.find(n)) ;
if(itc == _slices_to_download.end())
{
std::cerr << "!!! ChunkMap::dataReceived: error: ChunkId " << cid << " corresponds to chunk number " << n << ", which is not being downloaded!" << std::endl ;
assert(false) ;
return ;
}
std::map<ftChunk::ChunkId,uint32_t>::iterator it(itc->second._slices.find(cid)) ;
if(it == itc->second._slices.end())
{
std::cerr << "!!! ChunkMap::dataReceived: chunk " << cid << " is not found in slice lst of chunk number " << n << std::endl ;
assert(false) ;
return ;
}
_total_downloaded += it->second ;
itc->second._remains -= it->second ;
itc->second._slices.erase(it) ;
#ifdef DEBUG_FTCHUNK
std::cerr << "*** ChunkMap::dataReceived: received data chunk " << cid << " for chunk number " << n << ", local remains=" << itc->second._remains << ", total downloaded=" << _total_downloaded << ", remains=" << _file_size - _total_downloaded << std::endl ;
#endif
if(itc->second._remains == 0) // the chunk was completely downloaded
{
#ifdef DEBUG_FTCHUNK
std::cerr << "*** ChunkMap::dataReceived: Chunk is complete. Removing it." << std::endl ;
#endif
_map[n] = FileChunksInfo::CHUNK_DONE ;
_slices_to_download.erase(itc) ;
}
}
void ChunkMap::requested(uint64_t offset, uint32_t size)
// Warning: a chunk may be empty, but still being downloaded, so asking new slices from it
// will produce slices of size 0. This happens at the end of each chunk.
// --> I need to get slices from the next chunk, in such a case.
// --> solution:
// - have too chunks maps:
// 1 indexed by peer id to feed the getChunk method
// - chunks pushed when new chunks are needed
// - chunks removed when empty
// 1 indexed by chunk id to account for chunks being downloaded
// - chunks pushed when new chunks are needed
// - chunks removed when completely downloaded
//
bool ChunkMap::getDataChunk(const std::string& peer_id,uint32_t size_hint,ftChunk& chunk)
{
int start = offset/chunk_size ;
int end = (int)ceil((offset+size)/chunk_size) ;
#ifdef DEBUG_FTCHUNK
std::cerr << "*** ChunkMap::getDataChunk: size_hint = " << size_hint << std::endl ;
#endif
// 1 - find if this peer already has an active chunk.
//
std::map<std::string,Chunk>::iterator it = _active_chunks_feed.find(peer_id) ;
for(int i=start;i<=end;++i)
chunks[i] = CHUNK_ACTIVE ;
if(it == _active_chunks_feed.end())
{
// 1 - select an available chunk id for this peer.
//
uint32_t c ;
switch(_strategy)
{
case FileChunksInfo::CHUNK_STRATEGY_STREAMING: c = getAvailableChunk(0,peer_id) ; // very bold!!
break ;
case FileChunksInfo::CHUNK_STRATEGY_RANDOM: c = getAvailableChunk(rand()%_map.size(),peer_id) ;
break ;
default:
#ifdef DEBUG_FTCHUNK
std::cerr << "!!! ChunkMap::getDataChunk: error!: unknown strategy" << std::endl ;
#endif
return false ;
}
if(c >= _map.size())
return false ;
// 2 - add the chunk in the list of active chunks, and mark it as being downloaded
//
uint32_t soc = sizeOfChunk(c) ;
_active_chunks_feed[peer_id] = Chunk( c*(uint64_t)_chunk_size, soc ) ;
_map[c] = FileChunksInfo::CHUNK_ACTIVE ;
_slices_to_download[c]._remains = soc ; // init the list of slices to download
#ifdef DEBUG_FTCHUNK
std::cout << "*** ChunkMap::getDataChunk: Allocating new chunk " << c << " for peer " << peer_id << std::endl ;
#endif
}
#ifdef DEBUG_FTCHUNK
else
std::cout << "*** ChunkMap::getDataChunk: Re-using chunk " << it->second._start/_chunk_size << " for peer " << peer_id << std::endl ;
#endif
// Get the first slice of the chunk, that is at most of length size
//
_active_chunks_feed[peer_id].getSlice(size_hint,chunk) ;
_slices_to_download[chunk.offset/_chunk_size]._slices[chunk.id] = chunk.size ;
if(_active_chunks_feed[peer_id].empty())
_active_chunks_feed.erase(_active_chunks_feed.find(peer_id)) ;
#ifdef DEBUG_FTCHUNK
std::cout << "*** ChunkMap::getDataChunk: returning slice " << chunk << " for peer " << peer_id << std::endl ;
#endif
return true ;
}
#ifdef A_FAIRE
void setPeerAvailabilityMap(const std::string& peer_id,const std::vector<uint32_t>& peer_map)
{
#ifdef DEBUG_FTCHUNK
std::cout << "Receiving new availability map for peer " << peer_id << std::endl ;
#endif
_peers_chunks_availability[peer_id] = peer_map ;
}
#endif
uint32_t ChunkMap::sizeOfChunk(uint32_t cid) const
{
if(cid == _map.size()-1)
return _file_size - (_map.size()-1)*_chunk_size ;
else
return _chunk_size ;
}
uint32_t ChunkMap::getAvailableChunk(uint32_t start_location,const std::string& peer_id) const
{
// Very bold algorithm: checks for 1st availabe chunk for this peer starting
// from the given start location.
#ifdef A_FAIRE
std::map<std::string,std::vector<uint32_t> >::const_iterator it(_peers_chunks_availability.find(peer_id)) ;
// Do we have records for this file source ?
//
if(it == _peers_chunks_availability.end())
{
#ifdef DEBUG_FTCHUNK
std::cout << "No chunk map for peer " << peer_id << ": returning false" << endl ;
#endif
return _map.size() ;
}
const std::vector<uint32_t> peer_chunks(it->second) ;
#endif
for(uint i=0;i<_map.size();++i)
{
uint32_t j = (start_location+i)%_map.size() ;
if(_map[j] == FileChunksInfo::CHUNK_OUTSTANDING /*&& peers_chunks[j] == CHUNK_DONE*/)
{
#ifdef DEBUG_FTCHUNK
std::cerr << "ChunkMap::getAvailableChunk: returning chunk " << j << " for peer " << peer_id << std::endl;
#endif
return j ;
}
}
#ifdef DEBUG_FTCHUNK
std::cout << "!!! ChunkMap::getAvailableChunk: No available chunk from peer " << peer_id << ": returning false" << std::endl ;
#endif
return _map.size() ;
}
void ChunkMap::getChunksInfo(FileChunksInfo& info) const
{
info.file_size = _file_size ;
info.chunk_size = _chunk_size ;
info.chunks = _map ;
}

View File

@ -1,13 +1,146 @@
#pragma once
#include <map>
#include <rsiface/rstypes.h>
class ChunkMap: public FileChunksInfo
// ftChunkMap:
// - handles chunk map over a complete file
// - mark down which chunk is being downloaded by which peer
// - allocate data ranges of any requested size for a given peer
// - continuing an existing chunk
// - allocating a new chunk
//
// Download mecanism:
// - ftFileCreator handles a list of active slices, and periodically resends requests every 20 sec.
// Slices have arbitrary size (less than a chunk), depending on the transfer rate.
// When receiving data, ftFileCreator shrinks its slices until they get complete. When a slice is finished, it
// notifies ftChunkMap that this slice is done.
//
// - ftChunkMap maintains two levels:
// - the chunk level (Chunks a 1MB long) with a map of who has which chunk and what locally is the state of
// each chunk
// - the slice level: each active chunk is cut into slices (basically a list of intervalls) being downloaded, and
// a remaining slice to cut off new candidates. When notified for a complete slice, ftChunkMap removed the
// corresponding acive slice. When asked a slice, ftChunkMap cuts out a slice from the remaining part of the chunk
// to download, sends the slice's coordinates and gives a unique slice id (such as the slice offset).
// This class handles a slice of a chunk, at the level of ftFileCreator
class ftChunk
{
public:
ChunkMap(uint64_t size) ;
typedef uint64_t ChunkId ;
virtual void received(uint64_t offset) ;
virtual void requested(uint64_t offset, uint32_t chunk_size) ;
ftChunk():offset(0), size(0), ts(0) {}
friend std::ostream& operator<<(std::ostream& o,const ftChunk& f) ;
uint64_t offset;
uint64_t size;
ChunkId id ;
time_t ts;
};
// This class handles a single chunk. Although each chunk is requested at once,
// it may be sent back into sub-chunks because of file transfer rate constraints.
// So the dataReceived function should be called to progressively complete the chunk,
// and the getChunk method should ask for a sub0chunk of a given size.
//
class Chunk
{
public:
Chunk(): _start(0),_offset(0),_end(0) {} // only used in default std::map fillers
Chunk(uint64_t start,uint32_t size) ;
void getSlice(uint32_t size_hint,ftChunk& chunk) ;
// Returns true when the chunk is complete
bool empty() const { return _offset == _end ; }
// Array of intervalls of bytes to download.
//
uint64_t _start ; // const
uint64_t _offset ; // not const: handles the current offset within the chunk.
uint64_t _end ; // const
};
class ChunkDownloadInfo
{
public:
std::map<ftChunk::ChunkId,uint32_t> _slices ;
uint32_t _remains ;
};
class ChunkMap
{
public:
typedef uint32_t ChunkNumber ;
// Constructor. Decides what will be the size of chunks and how many there will be.
ChunkMap(uint64_t file_size) ;
// Returns an slice of data to be asked to the peer within a chunk.
// If a chunk is already been downloaded by this peer, take a slice at
// the beginning of this chunk, or at least where it starts.
// If not, randomly/streamly select a new chunk depending on the strategy.
// adds an entry in the chunk_ids map, and sets up 1 interval for it.
// the chunk should be available from the designated peer.
virtual bool getDataChunk(const std::string& peer_id,uint32_t size_hint,ftChunk& chunk) ;
// Notify received a slice of data. This needs to
// - carve in the map of chunks what is received, what is not.
// - tell which chunks are finished. For this, each interval must know what chunk number it has been attributed
// when the interval is split in the middle, the number of intervals for the chunk is increased. If the interval is
// completely covered by the data, the interval number is decreased.
virtual void dataReceived(const ftChunk::ChunkId& c_id) ;
// Decides how chunks are selected.
// STREAMING: the 1st chunk is always returned
// RANDOM: the beginning of a random interval is selected first. If two few intervals
// exist, the largest one is randomly split into two.
void setStrategy(FileChunksInfo::ChunkStrategy s) { _strategy = s ; }
#ifdef TO_DO
// Properly fills an vector of fixed size chunks with availability or download state.
// chunks is given with the proper number of chunks and we have to adapt to it. This can be used
// to display square chunks in the gui or display a blue bar of availability by collapsing info from all peers.
void linearize(FileChunksInfo& info) const ;
// Updates the peer's availablility map
//
void setPeerAvailabilityMap(const std::string& peer_id,const std::vector<uint32_t>& peer_map) ;
#endif
// Returns the total size of downloaded data in the file.
uint64_t getTotalReceived() const { return _total_downloaded ; }
void getChunksInfo(FileChunksInfo& info) const ;
protected:
// handles what size the last chunk has.
uint32_t sizeOfChunk(uint32_t chunk_number) const ;
// Returns the first chunk available starting from start_location for this peer_id.
//
uint32_t getAvailableChunk(uint32_t start_location,const std::string& peer_id) const ;
private:
const uint64_t _file_size ; // total size of the file in bytes.
const uint32_t _chunk_size ; // Size of chunks. Common to all chunks.
FileChunksInfo::ChunkStrategy _strategy ; // how do we allocate new chunks
std::map<std::string,Chunk> _active_chunks_feed ; // vector of chunks being downloaded. Exactly one chunk per peer id.
std::map<ChunkNumber,ChunkDownloadInfo> _slices_to_download ; // list of (slice id,slice size)
std::vector<FileChunksInfo::ChunkState> _map ; // vector of chunk state over the whole file
std::map<std::string,std::vector<uint32_t> > _peers_chunks_availability ; // what does each source peer have.
uint64_t _total_downloaded ;
};

View File

@ -842,6 +842,21 @@ bool ftController::setPeerState(ftTransferModule *tm, std::string id,
}
bool ftController::setChunkStrategy(const std::string& hash,FileChunksInfo::ChunkStrategy s)
{
std::map<std::string,ftFileControl>::iterator mit;
mit=mDownloads.find(hash);
if (mit==mDownloads.end())
{
#ifdef CONTROL_DEBUG
std::cerr<<"ftController::FileCancel file is not found in mDownloads"<<std::endl;
#endif
return false;
}
mit->second.mCreator->setChunkStrategy(s) ;
return true ;
}
bool ftController::FileCancel(std::string hash)
{

View File

@ -132,6 +132,8 @@ bool FileRequest(std::string fname, std::string hash,
uint64_t size, std::string dest, uint32_t flags,
std::list<std::string> &sourceIds);
bool setChunkStrategy(const std::string& hash,FileChunksInfo::ChunkStrategy s);
bool FileCancel(std::string hash);
bool FileControl(std::string hash, uint32_t flags);
bool FileClearCompleted();

View File

@ -15,8 +15,8 @@
*
***********************************************************/
ftFileCreator::ftFileCreator(std::string path, uint64_t size, std::string
hash, uint64_t recvd): ftFileProvider(path,size,hash), chunkMap(size)
ftFileCreator::ftFileCreator(std::string path, uint64_t size, std::string hash, uint64_t recvd)
: ftFileProvider(path,size,hash), chunkMap(size)
{
/*
* FIXME any inits to do?
@ -36,23 +36,25 @@ hash, uint64_t recvd): ftFileProvider(path,size,hash), chunkMap(size)
RsStackMutex stack(ftcMutex); /********** STACK LOCKED MTX ******/
/* initialise the Transfer Lists */
mStart = recvd;
mEnd = recvd;
// mStart = recvd;
// mEnd = recvd;
chunkMap.received(recvd) ;
#ifdef TO_DO
// we should init the chunk map with some bit array saying what is received and what is not!!
chunkMap.setTotalReceived(recvd) ;
#endif
}
bool ftFileCreator::getFileData(uint64_t offset,
uint32_t &chunk_size, void *data)
bool ftFileCreator::getFileData(uint64_t offset, uint32_t &chunk_size, void *data)
{
{
RsStackMutex stack(ftcMutex); /********** STACK LOCKED MTX ******/
if (offset + chunk_size > mStart)
{
/* don't have the data */
return false;
}
}
RsStackMutex stack(ftcMutex); /********** STACK LOCKED MTX ******/
if (offset + chunk_size > mStart)
{
/* don't have the data */
return false;
}
}
return ftFileProvider::getFileData(offset, chunk_size, data);
}
@ -60,7 +62,7 @@ bool ftFileCreator::getFileData(uint64_t offset,
uint64_t ftFileCreator::getRecvd()
{
RsStackMutex stack(ftcMutex); /********** STACK LOCKED MTX ******/
return mStart;
return chunkMap.getTotalReceived() ;
}
bool ftFileCreator::addFileData(uint64_t offset, uint32_t chunk_size, void *data)
@ -206,8 +208,8 @@ int ftFileCreator::initializeFileAttrs()
std::cerr << "ftFileCreator::initializeFileAttrs() File Expected Size: " << mSize << " RecvdSize: " << recvdsize << std::endl;
/* start from there! */
mStart = recvdsize;
mEnd = recvdsize;
// mStart = recvdsize;
// mEnd = recvdsize;
return 1;
}
@ -230,9 +232,9 @@ int ftFileCreator::locked_notifyReceived(uint64_t offset, uint32_t chunk_size)
#endif
/* find the chunk */
std::map<uint64_t, ftChunk>::iterator it;
it = mChunks.find(offset);
bool isFirst = false;
std::map<uint64_t, ftChunk>::iterator it = mChunks.find(offset);
// bool isFirst = false;
if (it == mChunks.end())
{
#ifdef FILE_DEBUG
@ -244,35 +246,37 @@ int ftFileCreator::locked_notifyReceived(uint64_t offset, uint32_t chunk_size)
#endif
return 0; /* ignoring */
}
else if (it == mChunks.begin())
{
isFirst = true;
}
// if (it == mChunks.begin())
// {
// isFirst = true;
// }
ftChunk chunk = it->second;
mChunks.erase(it);
if (chunk.chunk != chunk_size)
if (chunk.size != chunk_size)
{
/* partial : shrink chunk */
chunk.chunk -= chunk_size;
chunk.size -= chunk_size;
chunk.offset += chunk_size;
mChunks[chunk.offset] = chunk;
}
else // notify the chunkmap that the slice is finished
chunkMap.dataReceived(chunk.id) ;
/* update how much has been completed */
if (isFirst)
{
mStart = offset + chunk_size;
}
// /* update how much has been completed */
// if (isFirst)
// {
// mStart = offset + chunk_size;
// }
// update chunk map
chunkMap.received(mStart) ;
if (mChunks.size() == 0)
{
mStart = mEnd;
}
// if (mChunks.size() == 0)
// {
// mStart = mEnd;
// }
/* otherwise there is another earlier block to go
*/
@ -280,15 +284,25 @@ int ftFileCreator::locked_notifyReceived(uint64_t offset, uint32_t chunk_size)
return 1;
}
void ftFileCreator::setChunkStrategy(FileChunksInfo::ChunkStrategy s)
{
RsStackMutex stack(ftcMutex); /********** STACK LOCKED MTX ******/
#ifdef FILE_DEBUG
std::cerr << "ftFileCtreator: setting chunk strategy to " << s << std::endl ;
#endif
chunkMap.setStrategy(s) ;
}
/* Returns true if more to get
* But can return size = 0, if we are still waiting for the data.
*/
bool ftFileCreator::getMissingChunk(uint64_t &offset, uint32_t &chunk)
bool ftFileCreator::getMissingChunk(const std::string& peer_id,uint32_t size_hint,uint64_t &offset, uint32_t& size)
{
RsStackMutex stack(ftcMutex); /********** STACK LOCKED MTX ******/
#ifdef FILE_DEBUG
std::cerr << "ffc::getMissingChunk(...,"<< chunk << ")";
std::cerr << "ffc::getMissingChunk(...,"<< size_hint << ")";
std::cerr << " this: " << this;
std::cerr << std::endl;
locked_printChunkMap();
@ -296,14 +310,14 @@ bool ftFileCreator::getMissingChunk(uint64_t &offset, uint32_t &chunk)
/* check start point */
if (mStart == mSize)
{
#ifdef FILE_DEBUG
std::cerr << "ffc::getMissingChunk() File Done";
std::cerr << std::endl;
#endif
return false;
}
// if(mStart == mSize)
// {
//#ifdef FILE_DEBUG
// std::cerr << "ffc::getMissingChunk() File Done";
// std::cerr << std::endl;
//#endif
// return false;
// }
/* check for freed chunks */
time_t ts = time(NULL);
@ -316,48 +330,47 @@ bool ftFileCreator::getMissingChunk(uint64_t &offset, uint32_t &chunk)
if (it->second.ts < old)
{
#ifdef FILE_DEBUG
std::cerr << "ffc::getMissingChunk() ReAlloc";
std::cerr << "ffc::getMissingChunk() Re-asking for an old chunk";
std::cerr << std::endl;
#endif
/* retry this one */
it->second.ts = ts;
chunk = it->second.chunk;
size = it->second.size;
offset = it->second.offset;
return true;
}
}
#ifdef FILE_DEBUG
std::cerr << "ffc::getMissingChunk() new Alloc";
std::cerr << " mStart: " << mStart << " mEnd: " << mEnd;
std::cerr << "mSize: " << mSize;
std::cerr << std::endl;
#endif
/* else allocate a new chunk */
if (mSize - mEnd < chunk)
chunk = mSize - mEnd;
offset = mEnd;
mEnd += chunk;
ftChunk chunk ;
if (chunk > 0)
{
if(!chunkMap.getDataChunk(peer_id,size_hint,chunk))
return false ;
// if (mSize - mEnd < chunk)
// chunk = mSize - mEnd;
//
// offset = mEnd;
// mEnd += chunk;
// if (chunk > 0)
// {
#ifdef FILE_DEBUG
std::cerr << "ffc::getMissingChunk() Allocated " << chunk;
std::cerr << " offset: " << offset;
std::cerr << std::endl;
std::cerr << " mStart: " << mStart << " mEnd: " << mEnd;
std::cerr << "mSize: " << mSize;
std::cerr << std::endl;
std::cerr << "ffc::getMissingChunk() Retrieved new chunk: " << chunk << std::endl ;
// std::cerr << std::endl;
// std::cerr << " mStart: " << mStart << " mEnd: " << mEnd;
// std::cerr << "mSize: " << mSize;
// std::cerr << std::endl;
#endif
mChunks[offset] = ftChunk(offset, chunk, ts);
mChunks[chunk.offset] = chunk ;
chunkMap.requested(offset,chunk) ;
}
offset = chunk.offset ;
size = chunk.size ;
// }
return true; /* cos more data to get */
}
@ -365,7 +378,8 @@ bool ftFileCreator::getMissingChunk(uint64_t &offset, uint32_t &chunk)
void ftFileCreator::getChunkMap(FileChunksInfo& info)
{
RsStackMutex stack(ftcMutex); /********** STACK LOCKED MTX ******/
info = chunkMap ;
chunkMap.getChunksInfo(info) ;
}
bool ftFileCreator::locked_printChunkMap()
@ -377,37 +391,16 @@ bool ftFileCreator::locked_printChunkMap()
#endif
/* check start point */
std::cerr << "Size: " << mSize << " Start: " << mStart << " End: " << mEnd;
std::cerr << std::endl;
std::cerr << "\tOutstanding Chunks (in the middle)";
// std::cerr << "Size: " << mSize << " Start: " << mStart << " End: " << mEnd;
std::cerr << "\tOutstanding Chunks:";
std::cerr << std::endl;
std::map<uint64_t, ftChunk>::iterator it;
time_t ts = time(NULL);
for(it = mChunks.begin(); it != mChunks.end(); it++)
{
std::cerr << "\tChunk [" << it->second.offset << "] size: ";
std::cerr << it->second.chunk;
std::cerr << " Age: " << ts - it->second.ts;
std::cerr << std::endl;
}
std::cerr << " " << it->second << std::endl ;
return true;
}
/***********************************************************
*
* ftChunk methods
*
***********************************************************/
ftChunk::ftChunk(uint64_t ioffset,uint64_t size,time_t now)
: offset(ioffset), chunk(size), ts(now)
{
}
ftChunk::~ftChunk()
{
}

View File

@ -35,7 +35,6 @@
#include "ftfileprovider.h"
#include "ftchunkmap.h"
#include <map>
class ftChunk;
class ftFileCreator: public ftFileProvider
{
@ -51,12 +50,13 @@ virtual bool getFileData(uint64_t offset, uint32_t &chunk_size, void *data);
uint64_t getRecvd();
void getChunkMap(FileChunksInfo& info) ;
void setChunkStrategy(FileChunksInfo::ChunkStrategy s) ;
/*
* creation functions for FileCreator
*/
bool getMissingChunk(uint64_t &offset, uint32_t &chunk);
bool getMissingChunk(const std::string& peer_id,uint32_t size_hint,uint64_t& offset, uint32_t& size);
bool addFileData(uint64_t offset, uint32_t chunk_size, void *data);
protected:
@ -79,15 +79,4 @@ private:
ChunkMap chunkMap ;
};
class ftChunk {
public:
ftChunk(uint64_t,uint64_t,time_t);
ftChunk():offset(0), chunk(0), ts(0) {}
~ftChunk();
uint64_t offset;
uint64_t chunk;
time_t ts;
};
#endif // FT_FILE_CREATOR_HEADER

View File

@ -256,6 +256,11 @@ bool ftServer::FileRequest(std::string fname, std::string hash, uint64_t size,
return true ;
}
bool ftServer::setChunkStrategy(const std::string& hash,FileChunksInfo::ChunkStrategy s)
{
return mFtController->setChunkStrategy(hash,s);
}
bool ftServer::FileCancel(std::string hash)
{
rsTurtle->stopMonitoringFileTunnels(hash) ;

View File

@ -121,6 +121,7 @@ virtual bool FileRequest(std::string fname, std::string hash, uint64_t size,
virtual bool FileCancel(std::string hash);
virtual bool FileControl(std::string hash, uint32_t flags);
virtual bool FileClearCompleted();
virtual bool setChunkStrategy(const std::string& hash,FileChunksInfo::ChunkStrategy s) ;
/***
* Control of Downloads Priority.

View File

@ -261,8 +261,7 @@ uint32_t ftTransferModule::getDataRate(std::string peerId)
//interface to client module
bool ftTransferModule::recvFileData(std::string peerId, uint64_t offset,
uint32_t chunk_size, void *data)
bool ftTransferModule::recvFileData(std::string peerId, uint64_t offset, uint32_t chunk_size, void *data)
{
#ifdef FT_DEBUG
std::cerr << "ftTransferModule::recvFileData()";
@ -316,18 +315,19 @@ void ftTransferModule::requestData(std::string peerId, uint64_t offset, uint32_t
mMultiplexor->sendDataRequest(peerId, mHash, mSize, offset,chunk_size);
}
bool ftTransferModule::getChunk(uint64_t &offset, uint32_t &chunk_size)
bool ftTransferModule::getChunk(const std::string& peer_id,uint32_t size_hint,uint64_t &offset, uint32_t &chunk_size)
{
#ifdef FT_DEBUG
std::cerr << "ftTransferModule::getChunk()";
std::cerr << " hash: " << mHash;
std::cerr << " size: " << mSize;
std::cerr << " offset: " << offset;
std::cerr << " size_hint: " << size_hint;
std::cerr << " chunk_size: " << chunk_size;
std::cerr << std::endl;
#endif
bool val = mFileCreator->getMissingChunk(offset, chunk_size);
bool val = mFileCreator->getMissingChunk(peer_id,size_hint,offset, chunk_size);
#ifdef FT_DEBUG
if (val)
@ -595,7 +595,9 @@ bool ftTransferModule::locked_tickPeerTransfer(peerInfo &info)
info.state = PQIPEER_IDLE;
return false;
}
#ifdef FT_DEBUG
std::cerr << "locked_tickPeerTransfer() actual rate (before): " << info.actualRate << ", lastTransfers=" << info.lastTransfers << std::endl ;
#endif
/* update rate */
info.actualRate = info.actualRate * 0.75 + 0.25 * info.lastTransfers;
info.lastTransfers = 0;
@ -617,15 +619,23 @@ bool ftTransferModule::locked_tickPeerTransfer(peerInfo &info)
{
if (info.mRateIncrease > 0)
{
#ifdef FT_DEBUG
std::cerr << "!!! - Emergency shutdown because rttActive is true, and age is " << ts - info.rttStart << std::endl ;
#endif
info.mRateIncrease = 0;
info.rttActive = false ; // I've added this to avoid being stuck when rttActive is true
}
}
/* request at more than current rate */
uint32_t next_req = info.actualRate * (1.0 + info.mRateIncrease);
#ifdef FT_DEBUG
std::cerr << "locked_tickPeerTransfer() actual rate (after): " << actualRate
<< " increase factor=" << 1.0 + info.mRateIncrease
<< " info.desiredRate=" << info.desiredRate
<< " info.actualRate=" << info.actualRate
<< ", next_req=" << next_req ;
std::cerr << "locked_tickPeerTransfer() actual rate: " << actualRate;
std::cerr << std::endl;
#endif
@ -667,19 +677,21 @@ bool ftTransferModule::locked_tickPeerTransfer(peerInfo &info)
/* do request */
uint64_t req_offset = 0;
if (getChunk(req_offset,next_req))
uint32_t req_size =0 ;
if (getChunk(info.peerId,next_req,req_offset,req_size))
{
if (next_req > 0)
if (req_size > 0)
{
info.state = PQIPEER_DOWNLOADING;
requestData(info.peerId,req_offset,next_req);
requestData(info.peerId,req_offset,req_size);
/* start next rtt measurement */
if (!info.rttActive)
{
info.rttStart = ts;
info.rttActive = true;
info.rttOffset = req_offset + next_req;
info.rttOffset = req_offset + req_size;
}
}
else
@ -688,9 +700,8 @@ bool ftTransferModule::locked_tickPeerTransfer(peerInfo &info)
std::cerr << std::endl;
}
}
else mFlag = 1;
return true ;
else
mFlag = 1;
return true;
}
@ -704,6 +715,8 @@ bool ftTransferModule::locked_recvPeerData(peerInfo &info, uint64_t offset,
#ifdef FT_DEBUG
std::cerr << "ftTransferModule::locked_recvPeerData()";
std::cerr << " peerId: " << info.peerId;
std::cerr << " rttOffset: " << info.rttOffset;
std::cerr << " lastTransfers: " << info.lastTransfers;
std::cerr << " offset: " << offset;
std::cerr << " chunksize: " << chunk_size;
std::cerr << " data: " << data;
@ -718,36 +731,36 @@ bool ftTransferModule::locked_recvPeerData(peerInfo &info, uint64_t offset,
if ((info.rttActive) && (info.rttOffset == offset + chunk_size))
{
/* update tip */
int32_t rtt = time(NULL) - info.rttStart;
/* update tip */
int32_t rtt = time(NULL) - info.rttStart;
/*
* FT_TM_FAST_RTT = 1 sec. mRateIncrease = 1.00
* FT_TM_SLOW_RTT = 9 sec. mRateIncrease = 0
* 11 sec. mRateIncrease = -0.25
* if it is slower than this allow fast data increase.
* initial guess - linear with rtt.
* change if this leads to wild oscillations
*
*/
/*
* FT_TM_FAST_RTT = 1 sec. mRateIncrease = 1.00
* FT_TM_SLOW_RTT = 9 sec. mRateIncrease = 0
* 11 sec. mRateIncrease = -0.25
* if it is slower than this allow fast data increase.
* initial guess - linear with rtt.
* change if this leads to wild oscillations
*
*/
info.mRateIncrease = (FT_TM_SLOW_RTT - rtt) *
(FT_TM_MAX_INCREASE / (FT_TM_SLOW_RTT - FT_TM_FAST_RTT));
info.mRateIncrease = (FT_TM_SLOW_RTT - rtt) *
(FT_TM_MAX_INCREASE / (FT_TM_SLOW_RTT - FT_TM_FAST_RTT));
if (info.mRateIncrease > FT_TM_MAX_INCREASE)
info.mRateIncrease = FT_TM_MAX_INCREASE;
if (info.mRateIncrease > FT_TM_MAX_INCREASE)
info.mRateIncrease = FT_TM_MAX_INCREASE;
if (info.mRateIncrease < FT_TM_MIN_INCREASE)
info.mRateIncrease = FT_TM_MIN_INCREASE;
if (info.mRateIncrease < FT_TM_MIN_INCREASE)
info.mRateIncrease = FT_TM_MIN_INCREASE;
info.rtt = rtt;
info.rttActive = false;
info.rtt = rtt;
info.rttActive = false;
#ifdef FT_DEBUG
std::cerr << "ftTransferModule::locked_recvPeerData()";
std::cerr << "Updated Rate based on RTT: " << rtt;
std::cerr << " Rate: " << info.mRateIncrease;
std::cerr << std::endl;
std::cerr << "ftTransferModule::locked_recvPeerData()";
std::cerr << "Updated Rate based on RTT: " << rtt;
std::cerr << " Rate: " << info.mRateIncrease;
std::cerr << std::endl;
#endif
}

View File

@ -55,7 +55,8 @@ class peerInfo
{
public:
peerInfo(std::string peerId_in):peerId(peerId_in),state(PQIPEER_NOT_ONLINE),desiredRate(0),actualRate(0),
offset(0),chunkSize(0),receivedSize(0),lastTS(0),
// offset(0),chunkSize(0),receivedSize(0),
lastTS(0),
recvTS(0), lastTransfers(0), nResets(0),
rtt(0), rttActive(false), rttStart(0), rttOffset(0),
mRateIncrease(1)
@ -64,7 +65,8 @@ public:
}
peerInfo(std::string peerId_in,uint32_t state_in,uint32_t maxRate_in):
peerId(peerId_in),state(state_in),desiredRate(maxRate_in),actualRate(0),
offset(0),chunkSize(0),receivedSize(0),lastTS(0),
// offset(0),chunkSize(0),receivedSize(0),
lastTS(0),
recvTS(0), lastTransfers(0), nResets(0),
rtt(0), rttActive(false), rttStart(0), rttOffset(0),
mRateIncrease(1)
@ -77,11 +79,11 @@ public:
double actualRate;
//current file data request
uint64_t offset;
uint32_t chunkSize;
// uint64_t offset;
// uint32_t chunkSize;
//already received data size for current request
uint32_t receivedSize;
// uint32_t receivedSize;
time_t lastTS; /* last Request */
time_t recvTS; /* last Recv */
@ -147,7 +149,7 @@ public:
void requestData(std::string peerId, uint64_t offset, uint32_t chunk_size);
//interface to file creator
bool getChunk(uint64_t &offset, uint32_t &chunk_size);
bool getChunk(const std::string& peer_id,uint32_t size_hint,uint64_t &offset, uint32_t &chunk_size);
bool storeData(uint64_t offset, uint32_t chunk_size, void *data);
int tick();

View File

@ -110,6 +110,7 @@ virtual ~RsFiles() { return; }
virtual bool FileRequest(std::string fname, std::string hash, uint64_t size,
std::string dest, uint32_t flags, std::list<std::string> srcIds) = 0;
virtual bool FileCancel(std::string hash) = 0;
virtual bool setChunkStrategy(const std::string& hash,FileChunksInfo::ChunkStrategy) = 0;
virtual bool FileControl(std::string hash, uint32_t flags) = 0;
virtual bool FileClearCompleted() = 0;

View File

@ -252,6 +252,7 @@ class FileChunksInfo
{
public:
enum ChunkState { CHUNK_DONE, CHUNK_ACTIVE, CHUNK_OUTSTANDING } ;
enum ChunkStrategy { CHUNK_STRATEGY_STREAMING, CHUNK_STRATEGY_RANDOM } ;
uint64_t file_size ; // real size of the file
uint32_t chunk_size ; // size of chunks

View File

@ -303,6 +303,16 @@ void TransfersDialog::downloadListCostumPopupMenu( QPoint point )
priorityMenu->addAction(priorityHighAct);
priorityMenu->addAction(priorityAutoAct);
chunkStreamingAct = new QAction(QIcon(IMAGE_PRIORITYAUTO), tr("Streaming"), this);
connect(chunkStreamingAct, SIGNAL(triggered()), this, SLOT(chunkStreaming()));
chunkRandomAct = new QAction(QIcon(IMAGE_PRIORITYAUTO), tr("Random"), this);
connect(chunkRandomAct, SIGNAL(triggered()), this, SLOT(chunkRandom()));
QMenu *chunkMenu = new QMenu(tr("Chunk strategy"), this);
chunkMenu->setIcon(QIcon(IMAGE_PRIORITY));
chunkMenu->addAction(chunkStreamingAct);
chunkMenu->addAction(chunkRandomAct);
contextMnu.clear();
if (addPlayOption)
{
@ -310,6 +320,7 @@ void TransfersDialog::downloadListCostumPopupMenu( QPoint point )
}
contextMnu.addSeparator();
contextMnu.addMenu( priorityMenu);
contextMnu.addMenu( chunkMenu);
contextMnu.addAction( pauseAct);
contextMnu.addAction( resumeAct);
contextMnu.addAction( cancelAct);
@ -1252,6 +1263,25 @@ void TransfersDialog::clearQueue()
rsFiles->clearQueue();
}
void TransfersDialog::chunkStreaming()
{
setChunkStrategy(FileChunksInfo::CHUNK_STRATEGY_STREAMING) ;
}
void TransfersDialog::chunkRandom()
{
setChunkStrategy(FileChunksInfo::CHUNK_STRATEGY_RANDOM) ;
}
void TransfersDialog::setChunkStrategy(FileChunksInfo::ChunkStrategy s)
{
QList<QStandardItem *> items;
QList<QStandardItem *>::iterator it;
getIdOfSelectedItems(items);
for (it = items.begin(); it != items.end(); it ++) {
std::string hash = (*it)->data(Qt::DisplayRole).toString().toStdString();
rsFiles->setChunkStrategy(hash, s);
}
}
/* modify download priority actions */
void TransfersDialog::priorityLow()
{

View File

@ -30,6 +30,7 @@
#include <QVariant>
#include <stdint.h>
#include <rsiface/rstypes.h>
#include "mainpage.h"
#include "RsAutoUpdatePage.h"
@ -90,6 +91,9 @@ class TransfersDialog : public RsAutoUpdatePage
void priorityHigh();
void priorityAuto();
void chunkRandom();
void chunkStreaming();
/** save sort indicators for next transfers display */
void saveSortIndicatorDwl(int logicalIndex, Qt::SortOrder order);
void saveSortIndicatorUpl(int logicalIndex, Qt::SortOrder order);
@ -143,10 +147,13 @@ class TransfersDialog : public RsAutoUpdatePage
QAction *priorityNormalAct;
QAction *priorityHighAct;
QAction *priorityAutoAct;
QAction *chunkRandomAct;
QAction *chunkStreamingAct;
void getIdOfSelectedItems(QList<QStandardItem *>& items);
bool controlTransferFile(uint32_t flags);
void changePriority(int priority);
void setChunkStrategy(FileChunksInfo::ChunkStrategy s) ;
QTreeView *downloadList;

View File

@ -110,6 +110,7 @@ virtual ~RsFiles() { return; }
virtual bool FileRequest(std::string fname, std::string hash, uint64_t size,
std::string dest, uint32_t flags, std::list<std::string> srcIds) = 0;
virtual bool FileCancel(std::string hash) = 0;
virtual bool setChunkStrategy(const std::string& hash,FileChunksInfo::ChunkStrategy) = 0;
virtual bool FileControl(std::string hash, uint32_t flags) = 0;
virtual bool FileClearCompleted() = 0;

View File

@ -252,6 +252,7 @@ class FileChunksInfo
{
public:
enum ChunkState { CHUNK_DONE, CHUNK_ACTIVE, CHUNK_OUTSTANDING } ;
enum ChunkStrategy { CHUNK_STRATEGY_STREAMING, CHUNK_STRATEGY_RANDOM } ;
uint64_t file_size ; // real size of the file
uint32_t chunk_size ; // size of chunks