fixed the download queue, and allow queued files to start when sources get online and empty slots are present

This commit is contained in:
mr-alice 2016-11-22 23:19:34 +01:00
parent 69f503ea05
commit 47b825833a
2 changed files with 97 additions and 101 deletions

View File

@ -105,14 +105,13 @@ ftController::ftController(ftDataMultiplex *dm, p3ServiceControl *sc, uint32_t f
mTurtle(NULL),
mFtServer(NULL),
mServiceCtrl(sc),
mFtServiceId(ftServiceId),
mFtServiceType(ftServiceId),
ctrlMutex("ftController"),
doneMutex("ftController"),
mFtActive(false),
mDefaultChunkStrategy(FileChunksInfo::CHUNK_STRATEGY_PROGRESSIVE)
{
_max_active_downloads = 5 ; // default queue size
_min_prioritized_transfers = 3 ;
mDefaultEncryptionPolicy = RS_FILE_CTRL_ENCRYPTION_POLICY_PERMISSIVE;
/* TODO */
cnt = 0 ;
@ -176,7 +175,7 @@ void ftController::addFileSource(const RsFileHash& hash,const RsPeerId& peer_id)
if(it != mDownloads.end())
{
it->second->mTransfer->addFileSource(peer_id);
setPeerState(it->second->mTransfer, peer_id, FT_CNTRL_STANDARD_RATE, mServiceCtrl->isPeerConnected(mFtServiceId, peer_id ));
setPeerState(it->second->mTransfer, peer_id, FT_CNTRL_STANDARD_RATE, mServiceCtrl->isPeerConnected(mFtServiceType, peer_id ));
#ifdef CONTROL_DEBUG
std::cerr << "... added." << std::endl ;
@ -287,7 +286,7 @@ void ftController::searchForDirectSources()
for(std::list<TransferInfo>::const_iterator pit = info.peers.begin(); pit != info.peers.end(); ++pit)
if(rsPeers->servicePermissionFlags(pit->peerId) & RS_NODE_PERM_DIRECT_DL)
if(it->second->mTransfer->addFileSource(pit->peerId)) /* if the sources don't exist already - add in */
setPeerState(it->second->mTransfer, pit->peerId, FT_CNTRL_STANDARD_RATE, mServiceCtrl->isPeerConnected(mFtServiceId, pit->peerId));
setPeerState(it->second->mTransfer, pit->peerId, FT_CNTRL_STANDARD_RATE, mServiceCtrl->isPeerConnected(mFtServiceType, pit->peerId));
}
}
@ -336,8 +335,11 @@ void ftController::setPriority(const RsFileHash& hash,DwlSpeed p)
void ftController::checkDownloadQueue()
{
// We do multiple things here:
//
// 0 - make sure all transfers have a consistent value for mDownloadQueue
//
// 1 - are there queued files ?
//
// YES
// 1.1 - check for inactive files (see below).
// - select one inactive file
@ -345,7 +347,7 @@ void ftController::checkDownloadQueue()
// - remove it from turtle handling
// - move the ftFileControl to queued list
// - set the queue priority to 1+largest in the queue.
// 1.2 - pop from the queue the 1st file to come (according to priority)
// 1.2 - pop from the queue the 1st file to come (according to availability of sources, then priority)
// - enable turtle router handling for this hash
// - reset counters
// - set the file as waiting
@ -379,54 +381,70 @@ void ftController::checkDownloadQueue()
if(mDownloads.size() <= _max_active_downloads)
return ;
// Check for inactive transfers.
std::vector<ftFileControl*> inactive_transfers ;
std::vector<ftFileControl*> transfers_with_online_sources ;
std::set<RsPeerId> online_peers ;
mServiceCtrl->getPeersConnected(mFtServiceType,online_peers) ;
// Check for inactive transfers, and queued transfers with online sources.
//
time_t now = time(NULL) ;
uint32_t nb_moved = 0 ; // don't move more files than the size of the queue.
for(std::map<RsFileHash,ftFileControl*>::const_iterator it(mDownloads.begin());it!=mDownloads.end() && nb_moved <= _max_active_downloads;++it)
if( it->second->mState != ftFileControl::QUEUED
&& (it->second->mState == ftFileControl::PAUSED
for(std::map<RsFileHash,ftFileControl*>::const_iterator it(mDownloads.begin());it!=mDownloads.end() ;++it)
if( it->second->mState != ftFileControl::QUEUED && (it->second->mState == ftFileControl::PAUSED
|| now > it->second->mTransfer->lastActvTimeStamp() + (time_t)MAX_TIME_INACTIVE_REQUEUED))
inactive_transfers.push_back(it->second) ;
else if(it->second->mState == ftFileControl::QUEUED)
{
#ifdef DEBUG_DWLQUEUE
std::cerr << " - Inactive file " << it->second->mName << " at position " << it->second->mQueuePosition << " moved to end of the queue. mState=" << it->second->mState << ", time lapse=" << now - it->second->mCreator->lastActvTimeStamp() << std::endl ;
#endif
locked_bottomQueue(it->second->mQueuePosition) ;
#ifdef DEBUG_DWLQUEUE
std::cerr << " new position: " << it->second->mQueuePosition << std::endl ;
std::cerr << " new state: " << it->second->mState << std::endl ;
#endif
it->second->mTransfer->resetActvTimeStamp() ; // very important!
++nb_moved ;
}
std::list<RsPeerId> srcs ;
it->second->mTransfer->getFileSources(srcs) ;
// Check that at least _min_prioritized_transfers are assigned to non cache transfers
std::cerr << "Asserting that at least " << _min_prioritized_transfers << " are dedicated to user transfers." << std::endl;
int user_transfers = 0 ;
std::vector<uint32_t> to_move_before ;
std::vector<uint32_t> to_move_after ;
for(uint32_t p=0;p<_queue.size();++p)
for(std::list<RsPeerId>::const_iterator it2(srcs.begin());it2!=srcs.end();++it2)
if(online_peers.find(*it2) != online_peers.end())
{
if(p < _min_prioritized_transfers)
++user_transfers ; // count one more user file in the prioritized range.
else
{
if(to_move_after.size() + user_transfers >= _min_prioritized_transfers) // we caught enough transfers to move back to the top of the queue.
transfers_with_online_sources.push_back(it->second) ;
break ;
to_move_after.push_back(p) ;
}
}
uint32_t to_move = (uint32_t)std::max(0,(int)_min_prioritized_transfers - (int)user_transfers) ; // we move as many transfers as needed to get _min_prioritized_transfers user transfers.
std::cerr << " collected " << to_move << " transfers to move." << std::endl;
#ifdef DEBUG_DWLQUEUE
std::cerr << "Identified " << inactive_transfers.size() << " inactive transfer, and " << transfers_with_online_sources.size() << " queued transfers with online sources." << std::endl;
#endif
for(uint32_t i=0;i<to_move && i < to_move_after.size() && i<to_move_before.size();++i)
locked_swapQueue(to_move_before[i],to_move_after[i]) ;
// first swap as many queued transfers with online sources with inactive transfers
uint32_t i=0;
for(;i<inactive_transfers.size() && i<transfers_with_online_sources.size();++i)
{
#ifdef DEBUG_DWLQUEUE
std::cerr << " Exchanging queue position of inactive transfer " << inactive_transfers[i]->mName << " at position " << inactive_transfers[i]->mQueuePosition << " with transfer at position " << transfers_with_online_sources[i]->mQueuePosition << " which has available sources." << std::endl;
#endif
inactive_transfers[i]->mTransfer->resetActvTimeStamp() ; // very important!
transfers_with_online_sources[i]->mTransfer->resetActvTimeStamp() ; // very important!
locked_swapQueue(inactive_transfers[i]->mQueuePosition,transfers_with_online_sources[i]->mQueuePosition);
}
// now if some inactive transfers remain, put them at the end of the queue.
for(;i<inactive_transfers.size();++i)
{
#ifdef DEBUG_DWLQUEUE
std::cerr << " - Inactive file " << inactive_transfers[i]->mName << " at position " << inactive_transfers[i]->mQueuePosition << " moved to end of the queue. mState=" << inactive_transfers[i]->mState << ", time lapse=" << now - it->second->mCreator->lastActvTimeStamp() << std::endl ;
#endif
locked_bottomQueue(inactive_transfers[i]->mQueuePosition) ;
#ifdef DEBUG_DWLQUEUE
std::cerr << " new position: " << inactive_transfers[i]->mQueuePosition << std::endl ;
std::cerr << " new state: " << inactive_transfers[i]->mState << std::endl ;
#endif
inactive_transfers[i]->mTransfer->resetActvTimeStamp() ; // very important!
}
// finally, do a full swab over the queue to make sure that the expected number of downloads is met.
for(uint32_t i=0;i<mDownloadQueue.size();++i)
locked_checkQueueElement(i) ;
}
void ftController::locked_addToQueue(ftFileControl* ftfc,int add_strategy)
@ -437,38 +455,27 @@ void ftController::locked_addToQueue(ftFileControl* ftfc,int add_strategy)
switch(add_strategy)
{
default:
// Different strategies for files and cache files:
// - a min number of slots is reserved to user file transfer
// - cache files are always added after this slot.
//
case FT_FILECONTROL_QUEUE_ADD_END: _queue.push_back(ftfc) ;
locked_checkQueueElement(_queue.size()-1) ;
case FT_FILECONTROL_QUEUE_ADD_END: mDownloadQueue.push_back(ftfc) ;
locked_checkQueueElement(mDownloadQueue.size()-1) ;
break ;
}
}
void ftController::locked_queueRemove(uint32_t pos)
{
for(uint32_t p=pos;p<_queue.size()-1;++p)
for(uint32_t p=pos;p<mDownloadQueue.size()-1;++p)
{
_queue[p]=_queue[p+1] ;
mDownloadQueue[p]=mDownloadQueue[p+1] ;
locked_checkQueueElement(p) ;
}
_queue.pop_back();
mDownloadQueue.pop_back();
}
void ftController::setMinPrioritizedTransfers(uint32_t s)
{
RsStackMutex mtx(ctrlMutex) ;
_min_prioritized_transfers = s ;
}
uint32_t ftController::getMinPrioritizedTransfers()
{
RsStackMutex mtx(ctrlMutex) ;
return _min_prioritized_transfers ;
}
void ftController::setQueueSize(uint32_t s)
{
RsStackMutex mtx(ctrlMutex) ;
@ -481,7 +488,7 @@ void ftController::setQueueSize(uint32_t s)
std::cerr << "Settign new queue size to " << s << std::endl ;
#endif
for(uint32_t p=std::min(s,old_s);p<=std::max(s,old_s);++p)
if(p < _queue.size())
if(p < mDownloadQueue.size())
locked_checkQueueElement(p);
}
else
@ -521,7 +528,7 @@ void ftController::moveInQueue(const RsFileHash& hash,QueueMove mv)
locked_swapQueue(pos,pos-1) ;
break ;
case QUEUE_DOWN: if(pos < _queue.size()-1)
case QUEUE_DOWN: if(pos < mDownloadQueue.size()-1)
locked_swapQueue(pos,pos+1) ;
break ;
default:
@ -531,28 +538,28 @@ void ftController::moveInQueue(const RsFileHash& hash,QueueMove mv)
void ftController::locked_topQueue(uint32_t pos)
{
ftFileControl *tmp=_queue[pos] ;
ftFileControl *tmp=mDownloadQueue[pos] ;
for(int p=pos;p>0;--p)
{
_queue[p]=_queue[p-1] ;
mDownloadQueue[p]=mDownloadQueue[p-1] ;
locked_checkQueueElement(p) ;
}
_queue[0]=tmp ;
mDownloadQueue[0]=tmp ;
locked_checkQueueElement(0) ;
}
void ftController::locked_bottomQueue(uint32_t pos)
{
ftFileControl *tmp=_queue[pos] ;
ftFileControl *tmp=mDownloadQueue[pos] ;
for(uint32_t p=pos;p<_queue.size()-1;++p)
for(uint32_t p=pos;p<mDownloadQueue.size()-1;++p)
{
_queue[p]=_queue[p+1] ;
mDownloadQueue[p]=mDownloadQueue[p+1] ;
locked_checkQueueElement(p) ;
}
_queue[_queue.size()-1]=tmp ;
locked_checkQueueElement(_queue.size()-1) ;
mDownloadQueue[mDownloadQueue.size()-1]=tmp ;
locked_checkQueueElement(mDownloadQueue.size()-1) ;
}
void ftController::locked_swapQueue(uint32_t pos1,uint32_t pos2)
{
@ -561,9 +568,9 @@ void ftController::locked_swapQueue(uint32_t pos1,uint32_t pos2)
if(pos1==pos2)
return ;
ftFileControl *tmp = _queue[pos1] ;
_queue[pos1] = _queue[pos2] ;
_queue[pos2] = tmp;
ftFileControl *tmp = mDownloadQueue[pos1] ;
mDownloadQueue[pos1] = mDownloadQueue[pos2] ;
mDownloadQueue[pos2] = tmp;
locked_checkQueueElement(pos1) ;
locked_checkQueueElement(pos2) ;
@ -571,26 +578,26 @@ void ftController::locked_swapQueue(uint32_t pos1,uint32_t pos2)
void ftController::locked_checkQueueElement(uint32_t pos)
{
_queue[pos]->mQueuePosition = pos ;
mDownloadQueue[pos]->mQueuePosition = pos ;
if(pos < _max_active_downloads && _queue[pos]->mState != ftFileControl::PAUSED)
if(pos < _max_active_downloads && mDownloadQueue[pos]->mState != ftFileControl::PAUSED)
{
if(_queue[pos]->mState == ftFileControl::QUEUED)
_queue[pos]->mTransfer->resetActvTimeStamp() ;
if(mDownloadQueue[pos]->mState == ftFileControl::QUEUED)
mDownloadQueue[pos]->mTransfer->resetActvTimeStamp() ;
_queue[pos]->mState = ftFileControl::DOWNLOADING ;
mDownloadQueue[pos]->mState = ftFileControl::DOWNLOADING ;
if(_queue[pos]->mFlags & RS_FILE_REQ_ANONYMOUS_ROUTING)
mFtServer->activateTunnels(_queue[pos]->mHash,mDefaultEncryptionPolicy,_queue[pos]->mFlags,true);
if(mDownloadQueue[pos]->mFlags & RS_FILE_REQ_ANONYMOUS_ROUTING)
mFtServer->activateTunnels(mDownloadQueue[pos]->mHash,mDefaultEncryptionPolicy,mDownloadQueue[pos]->mFlags,true);
}
if(pos >= _max_active_downloads && _queue[pos]->mState != ftFileControl::QUEUED && _queue[pos]->mState != ftFileControl::PAUSED)
if(pos >= _max_active_downloads && mDownloadQueue[pos]->mState != ftFileControl::QUEUED && mDownloadQueue[pos]->mState != ftFileControl::PAUSED)
{
_queue[pos]->mState = ftFileControl::QUEUED ;
_queue[pos]->mCreator->closeFile() ;
mDownloadQueue[pos]->mState = ftFileControl::QUEUED ;
mDownloadQueue[pos]->mCreator->closeFile() ;
if(_queue[pos]->mFlags & RS_FILE_REQ_ANONYMOUS_ROUTING)
mFtServer->activateTunnels(_queue[pos]->mHash,mDefaultEncryptionPolicy,_queue[pos]->mFlags,false);
if(mDownloadQueue[pos]->mFlags & RS_FILE_REQ_ANONYMOUS_ROUTING)
mFtServer->activateTunnels(mDownloadQueue[pos]->mHash,mDefaultEncryptionPolicy,mDownloadQueue[pos]->mFlags,false);
}
}
@ -1119,7 +1126,7 @@ bool ftController::FileRequest(const std::string& fname, const RsFileHash& hash
std::cerr << std::endl;
#endif
(dit->second)->mTransfer->addFileSource(*it);
setPeerState(dit->second->mTransfer, *it, rate, mServiceCtrl->isPeerConnected(mFtServiceId, *it));
setPeerState(dit->second->mTransfer, *it, rate, mServiceCtrl->isPeerConnected(mFtServiceType, *it));
}
if (srcIds.empty())
@ -1214,7 +1221,7 @@ bool ftController::FileRequest(const std::string& fname, const RsFileHash& hash
std::cerr << "ftController::FileRequest() adding peer: " << *it;
std::cerr << std::endl;
#endif
setPeerState(tm, *it, rate, mServiceCtrl->isPeerConnected(mFtServiceId, *it));
setPeerState(tm, *it, rate, mServiceCtrl->isPeerConnected(mFtServiceType, *it));
}
/* add structures into the accessible data. Needs to be locked */
@ -1732,13 +1739,12 @@ void ftController::statusChange(const std::list<pqiServicePeer> &plist)
std::map<RsFileHash, ftFileControl*>::iterator it;
std::list<pqiServicePeer>::const_iterator pit;
#ifdef CONTROL_DEBUG
//#ifdef CONTROL_DEBUG
std::cerr << "ftController::statusChange()";
std::cerr << std::endl;
#endif
//#endif
for(it = mDownloads.begin(); it != mDownloads.end(); ++it)
if(it->second->mState == ftFileControl::DOWNLOADING)
{
#ifdef CONTROL_DEBUG
std::cerr << "ftController::statusChange() Updating Hash:";
@ -1777,8 +1783,11 @@ void ftController::statusChange(const std::list<pqiServicePeer> &plist)
}
}
// Now also look at turtle virtual peers.
// Now also look at turtle virtual peers, for ongoing downloads only.
//
if(it->second->mState != ftFileControl::DOWNLOADING)
continue ;
std::list<pqipeer> vlist ;
std::list<pqipeer>::const_iterator vit;
mTurtle->getSourceVirtualPeersList(it->first,vlist) ;
@ -1822,7 +1831,6 @@ void ftController::statusChange(const std::list<pqiServicePeer> &plist)
}
const std::string active_downloads_size_ss("MAX_ACTIVE_DOWNLOADS");
const std::string min_prioritized_downl_ss("MIN_PRORITIZED_DOWNLOADS");
const std::string download_dir_ss("DOWN_DIR");
const std::string partial_dir_ss("PART_DIR");
const std::string default_chunk_strategy_ss("DEFAULT_CHUNK_STRATEGY");
@ -1856,8 +1864,6 @@ bool ftController::saveList(bool &cleanup, std::list<RsItem *>& saveData)
/* basic control parameters */
std::string s ;
rs_sprintf(s, "%lu", getMinPrioritizedTransfers()) ;
configMap[min_prioritized_downl_ss] = s ;
rs_sprintf(s, "%lu", getQueueSize()) ;
configMap[active_downloads_size_ss] = s ;
configMap[download_dir_ss] = getDownloadDirectory();
@ -2107,13 +2113,6 @@ bool ftController::loadConfigMap(std::map<std::string, std::string> &configMap)
std::cerr << "Note: loading active max downloads: " << n << std::endl;
setQueueSize(n);
}
if (configMap.end() != (mit = configMap.find(min_prioritized_downl_ss)))
{
int n=3 ;
sscanf(mit->second.c_str(), "%d", &n);
std::cerr << "Note: loading min prioritized downloads: " << n << std::endl;
setMinPrioritizedTransfers(n);
}
if (configMap.end() != (mit = configMap.find(partial_dir_ss)))
{
setPartialsDirectory(mit->second);

View File

@ -164,8 +164,6 @@ class ftController: public RsTickingThread, public pqiServiceMonitor, public p3C
void clearQueue() ;
void setQueueSize(uint32_t size) ;
uint32_t getQueueSize() ;
void setMinPrioritizedTransfers(uint32_t size) ;
uint32_t getMinPrioritizedTransfers() ;
/* get Details of File Transfers */
void FileDownloads(std::list<RsFileHash> &hashs);
@ -238,7 +236,7 @@ class ftController: public RsTickingThread, public pqiServiceMonitor, public p3C
p3turtle *mTurtle ;
ftServer *mFtServer ;
p3ServiceControl *mServiceCtrl;
uint32_t mFtServiceId;
uint32_t mFtServiceType;
uint32_t mDefaultEncryptionPolicy ;
uint32_t cnt ;
@ -246,7 +244,7 @@ class ftController: public RsTickingThread, public pqiServiceMonitor, public p3C
std::map<RsFileHash, ftFileControl*> mCompleted;
std::map<RsFileHash, ftFileControl*> mDownloads;
std::vector<ftFileControl*> _queue ;
std::vector<ftFileControl*> mDownloadQueue ;
std::string mConfigPath;
std::string mDownloadPath;
@ -267,7 +265,6 @@ class ftController: public RsTickingThread, public pqiServiceMonitor, public p3C
FileChunksInfo::ChunkStrategy mDefaultChunkStrategy ;
uint32_t _max_active_downloads ; // maximum number of simultaneous downloads
uint32_t _min_prioritized_transfers ; // min number of non cache transfers in the top of the queue.
};
#endif