diff --git a/libretroshare/src/ft/ftcontroller.cc b/libretroshare/src/ft/ftcontroller.cc index fdd7ab119..0fd2014ec 100644 --- a/libretroshare/src/ft/ftcontroller.cc +++ b/libretroshare/src/ft/ftcontroller.cc @@ -105,14 +105,13 @@ ftController::ftController(ftDataMultiplex *dm, p3ServiceControl *sc, uint32_t f mTurtle(NULL), mFtServer(NULL), mServiceCtrl(sc), - mFtServiceId(ftServiceId), + mFtServiceType(ftServiceId), ctrlMutex("ftController"), doneMutex("ftController"), mFtActive(false), mDefaultChunkStrategy(FileChunksInfo::CHUNK_STRATEGY_PROGRESSIVE) { _max_active_downloads = 5 ; // default queue size - _min_prioritized_transfers = 3 ; mDefaultEncryptionPolicy = RS_FILE_CTRL_ENCRYPTION_POLICY_PERMISSIVE; /* TODO */ cnt = 0 ; @@ -176,7 +175,7 @@ void ftController::addFileSource(const RsFileHash& hash,const RsPeerId& peer_id) if(it != mDownloads.end()) { it->second->mTransfer->addFileSource(peer_id); - setPeerState(it->second->mTransfer, peer_id, FT_CNTRL_STANDARD_RATE, mServiceCtrl->isPeerConnected(mFtServiceId, peer_id )); + setPeerState(it->second->mTransfer, peer_id, FT_CNTRL_STANDARD_RATE, mServiceCtrl->isPeerConnected(mFtServiceType, peer_id )); #ifdef CONTROL_DEBUG std::cerr << "... added." << std::endl ; @@ -287,7 +286,7 @@ void ftController::searchForDirectSources() for(std::list::const_iterator pit = info.peers.begin(); pit != info.peers.end(); ++pit) if(rsPeers->servicePermissionFlags(pit->peerId) & RS_NODE_PERM_DIRECT_DL) if(it->second->mTransfer->addFileSource(pit->peerId)) /* if the sources don't exist already - add in */ - setPeerState(it->second->mTransfer, pit->peerId, FT_CNTRL_STANDARD_RATE, mServiceCtrl->isPeerConnected(mFtServiceId, pit->peerId)); + setPeerState(it->second->mTransfer, pit->peerId, FT_CNTRL_STANDARD_RATE, mServiceCtrl->isPeerConnected(mFtServiceType, pit->peerId)); } } @@ -337,7 +336,10 @@ void ftController::checkDownloadQueue() { // We do multiple things here: // + // 0 - make sure all transfers have a consistent value for mDownloadQueue + // // 1 - are there queued files ? + // // YES // 1.1 - check for inactive files (see below). // - select one inactive file @@ -345,7 +347,7 @@ void ftController::checkDownloadQueue() // - remove it from turtle handling // - move the ftFileControl to queued list // - set the queue priority to 1+largest in the queue. - // 1.2 - pop from the queue the 1st file to come (according to priority) + // 1.2 - pop from the queue the 1st file to come (according to availability of sources, then priority) // - enable turtle router handling for this hash // - reset counters // - set the file as waiting @@ -379,54 +381,70 @@ void ftController::checkDownloadQueue() if(mDownloads.size() <= _max_active_downloads) return ; - // Check for inactive transfers. + std::vector inactive_transfers ; + std::vector transfers_with_online_sources ; + + std::set online_peers ; + mServiceCtrl->getPeersConnected(mFtServiceType,online_peers) ; + + // Check for inactive transfers, and queued transfers with online sources. // time_t now = time(NULL) ; - uint32_t nb_moved = 0 ; // don't move more files than the size of the queue. - for(std::map::const_iterator it(mDownloads.begin());it!=mDownloads.end() && nb_moved <= _max_active_downloads;++it) - if( it->second->mState != ftFileControl::QUEUED - && (it->second->mState == ftFileControl::PAUSED + for(std::map::const_iterator it(mDownloads.begin());it!=mDownloads.end() ;++it) + if( it->second->mState != ftFileControl::QUEUED && (it->second->mState == ftFileControl::PAUSED || now > it->second->mTransfer->lastActvTimeStamp() + (time_t)MAX_TIME_INACTIVE_REQUEUED)) - { + inactive_transfers.push_back(it->second) ; + else if(it->second->mState == ftFileControl::QUEUED) + { + std::list srcs ; + it->second->mTransfer->getFileSources(srcs) ; + + for(std::list::const_iterator it2(srcs.begin());it2!=srcs.end();++it2) + if(online_peers.find(*it2) != online_peers.end()) + { + transfers_with_online_sources.push_back(it->second) ; + break ; + } + } + #ifdef DEBUG_DWLQUEUE - std::cerr << " - Inactive file " << it->second->mName << " at position " << it->second->mQueuePosition << " moved to end of the queue. mState=" << it->second->mState << ", time lapse=" << now - it->second->mCreator->lastActvTimeStamp() << std::endl ; + std::cerr << "Identified " << inactive_transfers.size() << " inactive transfer, and " << transfers_with_online_sources.size() << " queued transfers with online sources." << std::endl; #endif - locked_bottomQueue(it->second->mQueuePosition) ; + + // first swap as many queued transfers with online sources with inactive transfers + uint32_t i=0; + + for(;isecond->mQueuePosition << std::endl ; - std::cerr << " new state: " << it->second->mState << std::endl ; + std::cerr << " Exchanging queue position of inactive transfer " << inactive_transfers[i]->mName << " at position " << inactive_transfers[i]->mQueuePosition << " with transfer at position " << transfers_with_online_sources[i]->mQueuePosition << " which has available sources." << std::endl; #endif - it->second->mTransfer->resetActvTimeStamp() ; // very important! - ++nb_moved ; - } + inactive_transfers[i]->mTransfer->resetActvTimeStamp() ; // very important! + transfers_with_online_sources[i]->mTransfer->resetActvTimeStamp() ; // very important! - // Check that at least _min_prioritized_transfers are assigned to non cache transfers + locked_swapQueue(inactive_transfers[i]->mQueuePosition,transfers_with_online_sources[i]->mQueuePosition); + } - std::cerr << "Asserting that at least " << _min_prioritized_transfers << " are dedicated to user transfers." << std::endl; + // now if some inactive transfers remain, put them at the end of the queue. - int user_transfers = 0 ; - std::vector to_move_before ; - std::vector to_move_after ; - - for(uint32_t p=0;p<_queue.size();++p) + for(;i= _min_prioritized_transfers) // we caught enough transfers to move back to the top of the queue. - break ; - - to_move_after.push_back(p) ; - } +#ifdef DEBUG_DWLQUEUE + std::cerr << " - Inactive file " << inactive_transfers[i]->mName << " at position " << inactive_transfers[i]->mQueuePosition << " moved to end of the queue. mState=" << inactive_transfers[i]->mState << ", time lapse=" << now - it->second->mCreator->lastActvTimeStamp() << std::endl ; +#endif + locked_bottomQueue(inactive_transfers[i]->mQueuePosition) ; +#ifdef DEBUG_DWLQUEUE + std::cerr << " new position: " << inactive_transfers[i]->mQueuePosition << std::endl ; + std::cerr << " new state: " << inactive_transfers[i]->mState << std::endl ; +#endif + inactive_transfers[i]->mTransfer->resetActvTimeStamp() ; // very important! } - uint32_t to_move = (uint32_t)std::max(0,(int)_min_prioritized_transfers - (int)user_transfers) ; // we move as many transfers as needed to get _min_prioritized_transfers user transfers. - std::cerr << " collected " << to_move << " transfers to move." << std::endl; + // finally, do a full swab over the queue to make sure that the expected number of downloads is met. - for(uint32_t i=0;i0;--p) { - _queue[p]=_queue[p-1] ; + mDownloadQueue[p]=mDownloadQueue[p-1] ; locked_checkQueueElement(p) ; } - _queue[0]=tmp ; + mDownloadQueue[0]=tmp ; locked_checkQueueElement(0) ; } void ftController::locked_bottomQueue(uint32_t pos) { - ftFileControl *tmp=_queue[pos] ; + ftFileControl *tmp=mDownloadQueue[pos] ; - for(uint32_t p=pos;p<_queue.size()-1;++p) + for(uint32_t p=pos;pmQueuePosition = pos ; + mDownloadQueue[pos]->mQueuePosition = pos ; - if(pos < _max_active_downloads && _queue[pos]->mState != ftFileControl::PAUSED) + if(pos < _max_active_downloads && mDownloadQueue[pos]->mState != ftFileControl::PAUSED) { - if(_queue[pos]->mState == ftFileControl::QUEUED) - _queue[pos]->mTransfer->resetActvTimeStamp() ; + if(mDownloadQueue[pos]->mState == ftFileControl::QUEUED) + mDownloadQueue[pos]->mTransfer->resetActvTimeStamp() ; - _queue[pos]->mState = ftFileControl::DOWNLOADING ; + mDownloadQueue[pos]->mState = ftFileControl::DOWNLOADING ; - if(_queue[pos]->mFlags & RS_FILE_REQ_ANONYMOUS_ROUTING) - mFtServer->activateTunnels(_queue[pos]->mHash,mDefaultEncryptionPolicy,_queue[pos]->mFlags,true); + if(mDownloadQueue[pos]->mFlags & RS_FILE_REQ_ANONYMOUS_ROUTING) + mFtServer->activateTunnels(mDownloadQueue[pos]->mHash,mDefaultEncryptionPolicy,mDownloadQueue[pos]->mFlags,true); } - if(pos >= _max_active_downloads && _queue[pos]->mState != ftFileControl::QUEUED && _queue[pos]->mState != ftFileControl::PAUSED) + if(pos >= _max_active_downloads && mDownloadQueue[pos]->mState != ftFileControl::QUEUED && mDownloadQueue[pos]->mState != ftFileControl::PAUSED) { - _queue[pos]->mState = ftFileControl::QUEUED ; - _queue[pos]->mCreator->closeFile() ; + mDownloadQueue[pos]->mState = ftFileControl::QUEUED ; + mDownloadQueue[pos]->mCreator->closeFile() ; - if(_queue[pos]->mFlags & RS_FILE_REQ_ANONYMOUS_ROUTING) - mFtServer->activateTunnels(_queue[pos]->mHash,mDefaultEncryptionPolicy,_queue[pos]->mFlags,false); + if(mDownloadQueue[pos]->mFlags & RS_FILE_REQ_ANONYMOUS_ROUTING) + mFtServer->activateTunnels(mDownloadQueue[pos]->mHash,mDefaultEncryptionPolicy,mDownloadQueue[pos]->mFlags,false); } } @@ -1119,7 +1126,7 @@ bool ftController::FileRequest(const std::string& fname, const RsFileHash& hash std::cerr << std::endl; #endif (dit->second)->mTransfer->addFileSource(*it); - setPeerState(dit->second->mTransfer, *it, rate, mServiceCtrl->isPeerConnected(mFtServiceId, *it)); + setPeerState(dit->second->mTransfer, *it, rate, mServiceCtrl->isPeerConnected(mFtServiceType, *it)); } if (srcIds.empty()) @@ -1214,7 +1221,7 @@ bool ftController::FileRequest(const std::string& fname, const RsFileHash& hash std::cerr << "ftController::FileRequest() adding peer: " << *it; std::cerr << std::endl; #endif - setPeerState(tm, *it, rate, mServiceCtrl->isPeerConnected(mFtServiceId, *it)); + setPeerState(tm, *it, rate, mServiceCtrl->isPeerConnected(mFtServiceType, *it)); } /* add structures into the accessible data. Needs to be locked */ @@ -1732,13 +1739,12 @@ void ftController::statusChange(const std::list &plist) std::map::iterator it; std::list::const_iterator pit; -#ifdef CONTROL_DEBUG +//#ifdef CONTROL_DEBUG std::cerr << "ftController::statusChange()"; std::cerr << std::endl; -#endif +//#endif for(it = mDownloads.begin(); it != mDownloads.end(); ++it) - if(it->second->mState == ftFileControl::DOWNLOADING) { #ifdef CONTROL_DEBUG std::cerr << "ftController::statusChange() Updating Hash:"; @@ -1777,8 +1783,11 @@ void ftController::statusChange(const std::list &plist) } } - // Now also look at turtle virtual peers. + // Now also look at turtle virtual peers, for ongoing downloads only. // + if(it->second->mState != ftFileControl::DOWNLOADING) + continue ; + std::list vlist ; std::list::const_iterator vit; mTurtle->getSourceVirtualPeersList(it->first,vlist) ; @@ -1822,7 +1831,6 @@ void ftController::statusChange(const std::list &plist) } const std::string active_downloads_size_ss("MAX_ACTIVE_DOWNLOADS"); -const std::string min_prioritized_downl_ss("MIN_PRORITIZED_DOWNLOADS"); const std::string download_dir_ss("DOWN_DIR"); const std::string partial_dir_ss("PART_DIR"); const std::string default_chunk_strategy_ss("DEFAULT_CHUNK_STRATEGY"); @@ -1856,8 +1864,6 @@ bool ftController::saveList(bool &cleanup, std::list& saveData) /* basic control parameters */ std::string s ; - rs_sprintf(s, "%lu", getMinPrioritizedTransfers()) ; - configMap[min_prioritized_downl_ss] = s ; rs_sprintf(s, "%lu", getQueueSize()) ; configMap[active_downloads_size_ss] = s ; configMap[download_dir_ss] = getDownloadDirectory(); @@ -2107,13 +2113,6 @@ bool ftController::loadConfigMap(std::map &configMap) std::cerr << "Note: loading active max downloads: " << n << std::endl; setQueueSize(n); } - if (configMap.end() != (mit = configMap.find(min_prioritized_downl_ss))) - { - int n=3 ; - sscanf(mit->second.c_str(), "%d", &n); - std::cerr << "Note: loading min prioritized downloads: " << n << std::endl; - setMinPrioritizedTransfers(n); - } if (configMap.end() != (mit = configMap.find(partial_dir_ss))) { setPartialsDirectory(mit->second); diff --git a/libretroshare/src/ft/ftcontroller.h b/libretroshare/src/ft/ftcontroller.h index c1ea26d0a..8ed2bcb8c 100644 --- a/libretroshare/src/ft/ftcontroller.h +++ b/libretroshare/src/ft/ftcontroller.h @@ -164,8 +164,6 @@ class ftController: public RsTickingThread, public pqiServiceMonitor, public p3C void clearQueue() ; void setQueueSize(uint32_t size) ; uint32_t getQueueSize() ; - void setMinPrioritizedTransfers(uint32_t size) ; - uint32_t getMinPrioritizedTransfers() ; /* get Details of File Transfers */ void FileDownloads(std::list &hashs); @@ -238,7 +236,7 @@ class ftController: public RsTickingThread, public pqiServiceMonitor, public p3C p3turtle *mTurtle ; ftServer *mFtServer ; p3ServiceControl *mServiceCtrl; - uint32_t mFtServiceId; + uint32_t mFtServiceType; uint32_t mDefaultEncryptionPolicy ; uint32_t cnt ; @@ -246,7 +244,7 @@ class ftController: public RsTickingThread, public pqiServiceMonitor, public p3C std::map mCompleted; std::map mDownloads; - std::vector _queue ; + std::vector mDownloadQueue ; std::string mConfigPath; std::string mDownloadPath; @@ -267,7 +265,6 @@ class ftController: public RsTickingThread, public pqiServiceMonitor, public p3C FileChunksInfo::ChunkStrategy mDefaultChunkStrategy ; uint32_t _max_active_downloads ; // maximum number of simultaneous downloads - uint32_t _min_prioritized_transfers ; // min number of non cache transfers in the top of the queue. }; #endif