From c7ecf8c46df3c18787401ef5f9f6462748ae7f93 Mon Sep 17 00:00:00 2001 From: alexandrut Date: Sun, 26 Jul 2009 15:00:29 +0000 Subject: [PATCH] add implementation for priority download queue git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@1432 b45a01b8-16f6-495d-af2f-9b41ad6348cc --- libretroshare/src/ft/ftdwlqueue.cc | 265 ++++++++++++++++++ libretroshare/src/ft/ftdwlqueue.h | 106 +++++++ libretroshare/src/ft/ftserver.cc | 38 ++- libretroshare/src/ft/ftserver.h | 12 + libretroshare/src/libretroshare.pro | 2 + libretroshare/src/rsiface/rsfiles.h | 9 +- retroshare-gui/src/rsiface/RemoteDirModel.cpp | 2 +- retroshare-gui/src/rsiface/rsfiles.h | 9 +- 8 files changed, 438 insertions(+), 5 deletions(-) create mode 100644 libretroshare/src/ft/ftdwlqueue.cc create mode 100644 libretroshare/src/ft/ftdwlqueue.h diff --git a/libretroshare/src/ft/ftdwlqueue.cc b/libretroshare/src/ft/ftdwlqueue.cc new file mode 100644 index 000000000..ce3c769cb --- /dev/null +++ b/libretroshare/src/ft/ftdwlqueue.cc @@ -0,0 +1,265 @@ +/* + * ftdwlqueue.cc + * + * Created on: Jul 20, 2009 + * Author: alexandrut + */ + +#include "ftdwlqueue.h" +#include "ftserver.h" + +#include + +/*#define DEBUG_QUEUE 1*/ + +ftDwlQueue::ftDwlQueue(ftController *ftc, unsigned int downloadLimit, unsigned int retryLimit) + : mFtController(ftc), downloadLimit(downloadLimit), retryLimit(retryLimit) { + return; +} + +ftDwlQueue::~ftDwlQueue() { + return; +} + +void ftDwlQueue::run() +{ +#ifdef DEBUG_QUEUE + std::cerr << "ftDwlQueue::run() started" << std::endl; +#endif + + while (1) { + +#ifdef WIN_32 + Sleep(1000); +#else + sleep(1); +#endif + + unsigned int sDwl = totalSystemDwl(); + unsigned int qDwl = totalQueuedDwl(); + + /* we have to know if one ore more downloads are + * paused in the queue and the next of them will + * not exceed download limit*/ + if (!(qDwl && (sDwl - 1 < downloadLimit))) continue; + + DwlDetails details; + if (!getNext(details)) continue; + + /* if the download was puased restart it + * else try a new request for download it */ + + if (details.paused == true) { + rsFiles->FileControl(details.hash, RS_FILE_CTRL_START); + } else { + if (!mFtController->FileRequest(details.fname, details.hash, details.count, details.dest, details.flags, details.srcIds)) { + if (details.retries < retryLimit - 1) { + details.retries ++; + if (details.priority > 0) { + details.priority = (DwlPriority) (details.priority - 1); + } + details.paused = false; + + prmtx.lock(); { + priorities.push_back(details); + priorities.sort(PriorityCompare()); + } + prmtx.unlock(); + } + } + } + } +} + +void ftDwlQueue::insertDownload(const DwlDetails & details) { + DwlDetails _details(details); + +#ifdef DEBUG_QUEUE + std::list::iterator it; + std::cerr << "ftDwlQueue::insertDownload(" + << _details.fname << "," + << _details.hash << "," + << _details.count << "," + << _details.dest << "," + << _details.flags << ",<"; + + for(it = _details.srcIds.begin(); it != _details.srcIds.end(); it ++) { + std::cerr << *it << ","; + } + std::cerr << ">)"; + std::cerr << std::endl; +#endif + + if (!mFtController->FileRequest(_details.fname, _details.hash, _details.count, _details.dest, _details.flags, _details.srcIds)) { + /* reque the download but with lower priority */ + + if (_details.retries < (retryLimit - 1)) { + _details.retries ++; + if (_details.priority > 0) { + _details.priority = (DwlPriority) (_details.priority - 1); + } + _details.paused = false; + + prmtx.lock(); { + priorities.push_back(_details); + priorities.sort(PriorityCompare()); + } + prmtx.unlock(); + } + } else { + /* continue a download only if queue is empty - no + * other paused dwls are waiting - and the number + * of downloads are not exceeding the limit, else + * stop it and put in queue */ + + unsigned int sDwl = totalSystemDwl(); + + RsStackMutex stack(prmtx); + if ((!priorities.empty()) || (sDwl >= downloadLimit)) { + rsFiles->FileControl(_details.hash, RS_FILE_CTRL_PAUSE); + _details.paused = true; + + priorities.push_back(_details); + priorities.sort(PriorityCompare()); + } + } +} + +bool ftDwlQueue::getNext(DwlDetails & details) { + RsStackMutex stack(prmtx); + + if (!priorities.empty()) { + details = priorities.front(); + priorities.pop_front(); +#ifdef DEBUG_QUEUE + std::cerr << "ftDwlQueue::getNext() file: " << details.fname + << " priority: " << details.priority << std::endl; +#endif + + return true; + } + + return false; +} + +bool ftDwlQueue::peekAtNext(DwlDetails & details) { + RsStackMutex stack(prmtx); + + if (!priorities.empty()) { + details = priorities.front(); +#ifdef DEBUG_QUEUE + std::cerr << "ftDwlQueue::peekAtNext() file: " << details.fname + << " priority: " << details.priority << std::endl; +#endif + + return true; + } + + return false; +} + +bool ftDwlQueue::changePriority(const std::string hash, DwlPriority priority) { + RsStackMutex stack(prmtx); + + std::list::iterator it; + for (it = priorities.begin(); it != priorities.end(); it ++) { + if (it->hash == hash) { + it->priority = priority; + priorities.sort(PriorityCompare()); +#ifdef DEBUG_QUEUE + std::cerr << "ftDwlQueue::changePriority() file: " << hash + << " new priority: " << it->priority << std::endl; +#endif + + return true; + } + } + + return false; +} + +bool ftDwlQueue::getPriority(const std::string hash, DwlPriority & priority) { + RsStackMutex stack(prmtx); + + std::list::const_iterator it; + for (it = priorities.begin(); it != priorities.end(); it ++) { + if (it->hash == hash) { + priority = it->priority; +#ifdef DEBUG_QUEUE + std::cerr << "ftDwlQueue::getPriority() file: " << hash + << " priority: " << priority << std::endl; +#endif + + return true; + } + } + + return false; +} + +bool ftDwlQueue::clearDownload(const std::string hash) { + RsStackMutex stack(prmtx); + + std::list::iterator it; + for (it = priorities.begin(); it != priorities.end(); it ++) { + if (it->hash == hash) { + it = priorities.erase(it); +#ifdef DEBUG_QUEUE + std::cerr << "ftDwlQueue::clearDownload() file: " << hash << std::endl; +#endif + + return true; + } + } + + return false; +} + +void ftDwlQueue::clearQueue() { + RsStackMutex stack(prmtx); + +#ifdef DEBUG_QUEUE + std::cerr << "ftDwlQueue::clearQueue()" << std::endl; +#endif + priorities.clear(); +} + +unsigned int ftDwlQueue::totalQueuedDwl() { + RsStackMutex stack(prmtx); + + /* count only paused dwls from the queue */ + int total = 0; + std::list::iterator it; + for (it = priorities.begin(); it != priorities.end(); it ++) { + if (it->paused) { + total ++; + } + } + + return total; +} + +unsigned int ftDwlQueue::totalSystemDwl() { + unsigned int totalDwl = 0; + + std::list hashes; + std::list::iterator it; + + rsFiles->FileDownloads(hashes); + + /* count the number of downloading files */ + for (it = hashes.begin(); it != hashes.end(); it ++) { + uint32_t flags = RS_FILE_HINTS_DOWNLOAD; + FileInfo info; + + if (!rsFiles->FileDetails(*it, flags, info)) continue; + + /* i'm not sure here what other types should be counted + * but state waiting is very important here - dwls that + * are just requested but not in downloading state */ + if (info.downloadStatus == FT_STATE_DOWNLOADING || info.downloadStatus == FT_STATE_WAITING) + totalDwl ++; + } + + return totalDwl; +} diff --git a/libretroshare/src/ft/ftdwlqueue.h b/libretroshare/src/ft/ftdwlqueue.h new file mode 100644 index 000000000..8b70770ee --- /dev/null +++ b/libretroshare/src/ft/ftdwlqueue.h @@ -0,0 +1,106 @@ +/* + * ftdwlqueue.h + * + * Created on: Jul 22, 2009 + * Author: alexandrut + */ + +#ifndef FTDWLQUEUE_H_ +#define FTDWLQUEUE_H_ + +#include "util/rsthreads.h" +#include "ftcontroller.h" + +#include +#include + +enum DwlPriority { Low = 0, Normal, High, Auto }; + +/* class which encapsulates download details */ +class DwlDetails { +public: + DwlDetails() { return; } + DwlDetails(std::string fname, std::string hash, int count, std::string dest, + uint32_t flags, std::list srcIds, DwlPriority priority) + : fname(fname), hash(hash), count(count), dest(dest), flags(flags), + srcIds(srcIds), retries(0), priority(priority), paused(false) { + return; + } + + /* download details */ + std::string fname; + std::string hash; + int count; + std::string dest; + uint32_t flags; + std::list srcIds; + unsigned int retries; + + /* internally used in download queue */ + DwlPriority priority; + bool paused; +}; + +/* comparator class used when sorting list */ +class PriorityCompare { +public: + PriorityCompare(bool reverse = false) + : reverse(reverse) { return; } + bool operator()(const DwlDetails & l, const DwlDetails & r) { + if (reverse) {return (l.priority < r.priority);} + else return (l.priority > r.priority); + } + +private: + bool reverse; +}; + +/* base class for a download queue with + * default actions for a priority queue */ +class DwlQueue { +public: + /* specific actions for a priority queue */ + virtual void insertDownload(const DwlDetails & details) = 0; + virtual bool getNext(DwlDetails & details) = 0; + virtual bool peekAtNext(DwlDetails & details) = 0; + + /* administrative actions */ + virtual bool changePriority(const std::string hash, DwlPriority priority) = 0; + virtual bool getPriority(const std::string hash, DwlPriority & priority) = 0; + virtual bool clearDownload(const std::string hash) = 0; + virtual void clearQueue() = 0; +}; + +/* general class for download queue which + * contains the a download priority list */ +class ftDwlQueue : public DwlQueue, public RsThread { +public: + ftDwlQueue(ftController *ftc, unsigned int downloadLimit = 7, unsigned int retryLimit = 10); + virtual ~ftDwlQueue(); + + /* from thread interface */ + virtual void run(); + + /* from download queue interface */ + virtual void insertDownload(const DwlDetails & details); + virtual bool getNext(DwlDetails & details); + virtual bool peekAtNext(DwlDetails & details); + virtual bool changePriority(const std::string hash, DwlPriority priority); + virtual bool getPriority(const std::string hash, DwlPriority & priority); + virtual bool clearDownload(const std::string hash); + virtual void clearQueue(); + +private: + unsigned int downloadLimit; + unsigned int retryLimit; + + ftController *mFtController; + + RsMutex prmtx; + std::list priorities; + + unsigned int totalQueuedDwl(); + unsigned int totalSystemDwl(); +}; + +#endif /* FTDWLQUEUE_H_ */ diff --git a/libretroshare/src/ft/ftserver.cc b/libretroshare/src/ft/ftserver.cc index 2d5df009f..b985839c4 100644 --- a/libretroshare/src/ft/ftserver.cc +++ b/libretroshare/src/ft/ftserver.cc @@ -32,6 +32,7 @@ const int ftserverzone = 29539; #include "ft/ftcontroller.h" #include "ft/ftfileprovider.h" #include "ft/ftdatamultiplex.h" +#include "ft/ftdwlqueue.h" #include "turtle/p3turtle.h" @@ -142,6 +143,8 @@ void ftServer::SetupFtServer(NotifyBase *cb) mConnMgr->addMonitor(mFtController); mConnMgr->addMonitor(mCacheStrapper); + mFtDwlQueue = new ftDwlQueue(mFtController); + return; } @@ -174,6 +177,9 @@ void ftServer::StartupThreads() /* Dataplex */ mFtDataplex->start(); + /* Download Queue */ + mFtDwlQueue->start(); + /* start own thread */ start(); } @@ -236,8 +242,10 @@ bool ftServer::FileRequest(std::string fname, std::string hash, uint64_t size, std::string dest, uint32_t flags, std::list srcIds) { std::cerr << "Requesting " << fname << std::endl ; - return mFtController->FileRequest(fname, hash, size, - dest, flags, srcIds); +// return mFtController->FileRequest(fname, hash, size, +// dest, flags, srcIds); + const DwlDetails details(fname, hash, size, dest, flags, srcIds, Normal); + mFtDwlQueue->insertDownload(details); } bool ftServer::FileCancel(std::string hash) @@ -255,6 +263,32 @@ bool ftServer::FileClearCompleted() return mFtController->FileClearCompleted(); } + /* Control of Downloads Priority. */ +bool ftServer::changePriority(const std::string hash, int priority) +{ + return mFtDwlQueue->changePriority(hash, (DwlPriority) priority); +} + +bool ftServer::getPriority(const std::string hash, int & priority) +{ + DwlPriority _priority; + int ret = mFtDwlQueue->getPriority(hash, _priority); + if (ret) { + priority = _priority; + } + + return ret; +} + +bool ftServer::clearDownload(const std::string hash) +{ + return mFtDwlQueue->clearDownload(hash); +} + +void ftServer::clearQueue() +{ + mFtDwlQueue->clearQueue(); +} /* Directory Handling */ void ftServer::setDownloadDirectory(std::string path) diff --git a/libretroshare/src/ft/ftserver.h b/libretroshare/src/ft/ftserver.h index 96a3e9e39..21b01bfb4 100644 --- a/libretroshare/src/ft/ftserver.h +++ b/libretroshare/src/ft/ftserver.h @@ -68,6 +68,8 @@ class ftFileSearch; class ftDataMultiplex; class p3turtle; +class ftDwlQueue; + class ftServer: public RsFiles, public ftDataSend, public RsThread { @@ -120,6 +122,14 @@ virtual bool FileCancel(std::string hash); virtual bool FileControl(std::string hash, uint32_t flags); virtual bool FileClearCompleted(); +/*** + * Control of Downloads Priority. + ***/ +virtual bool changePriority(const std::string hash, int priority); +virtual bool getPriority(const std::string hash, int & priority); +virtual bool clearDownload(const std::string hash); +virtual void clearQueue(); + /*** * Download/Upload Details ***/ @@ -242,6 +252,8 @@ bool loadConfigMap(std::map &configMap); ftFileSearch *mFtSearch; + ftDwlQueue *mFtDwlQueue; + RsMutex srvMutex; std::string mConfigPath; std::string mDownloadPath; diff --git a/libretroshare/src/libretroshare.pro b/libretroshare/src/libretroshare.pro index a4bb6093a..2f9e2879d 100644 --- a/libretroshare/src/libretroshare.pro +++ b/libretroshare/src/libretroshare.pro @@ -126,6 +126,7 @@ HEADERS += dbase/cachestrapper.h \ ft/ftsearch.h \ ft/ftserver.h \ ft/fttransfermodule.h \ + ft/ftdwlqueue.h \ pqi/authssl.h \ pqi/p3authmgr.h \ pqi/p3cfgmgr.h \ @@ -264,6 +265,7 @@ SOURCES = \ ft/ftfilecreator.cc \ ft/ftdata.cc \ ft/ftfileprovider.cc \ + ft/ftdwlqueue.cc \ upnp/upnputil.c \ dht/opendhtmgr.cc \ upnp/upnphandler.cc \ diff --git a/libretroshare/src/rsiface/rsfiles.h b/libretroshare/src/rsiface/rsfiles.h index 62d3e9a1f..256594d44 100644 --- a/libretroshare/src/rsiface/rsfiles.h +++ b/libretroshare/src/rsiface/rsfiles.h @@ -107,6 +107,14 @@ virtual bool FileCancel(std::string hash) = 0; virtual bool FileControl(std::string hash, uint32_t flags) = 0; virtual bool FileClearCompleted() = 0; +/*** + * Control of Downloads Priority. + ***/ +virtual bool changePriority(const std::string hash, int priority) = 0; +virtual bool getPriority(const std::string hash, int & priority) = 0; +virtual bool clearDownload(const std::string hash) = 0; +virtual void clearQueue() = 0; + /*** * Download / Upload Details. ***/ @@ -114,7 +122,6 @@ virtual bool FileDownloads(std::list &hashs) = 0; virtual bool FileUploads(std::list &hashs) = 0; virtual bool FileDetails(std::string hash, uint32_t hintflags, FileInfo &info) = 0; - /*** * Extra List Access ***/ diff --git a/retroshare-gui/src/rsiface/RemoteDirModel.cpp b/retroshare-gui/src/rsiface/RemoteDirModel.cpp index b34a41427..3eb944e2f 100644 --- a/retroshare-gui/src/rsiface/RemoteDirModel.cpp +++ b/retroshare-gui/src/rsiface/RemoteDirModel.cpp @@ -765,7 +765,7 @@ void RemoteDirModel::downloadSelected(QModelIndexList list) /* if it is a dir, copy all files included*/ else if (details.type == DIR_TYPE_DIR) { - size_t prefixLen = details.path.rfind(details.name); + int prefixLen = details.path.rfind(details.name); if (prefixLen < 0) continue; downloadDirectory(details, prefixLen); } diff --git a/retroshare-gui/src/rsiface/rsfiles.h b/retroshare-gui/src/rsiface/rsfiles.h index 62d3e9a1f..256594d44 100644 --- a/retroshare-gui/src/rsiface/rsfiles.h +++ b/retroshare-gui/src/rsiface/rsfiles.h @@ -107,6 +107,14 @@ virtual bool FileCancel(std::string hash) = 0; virtual bool FileControl(std::string hash, uint32_t flags) = 0; virtual bool FileClearCompleted() = 0; +/*** + * Control of Downloads Priority. + ***/ +virtual bool changePriority(const std::string hash, int priority) = 0; +virtual bool getPriority(const std::string hash, int & priority) = 0; +virtual bool clearDownload(const std::string hash) = 0; +virtual void clearQueue() = 0; + /*** * Download / Upload Details. ***/ @@ -114,7 +122,6 @@ virtual bool FileDownloads(std::list &hashs) = 0; virtual bool FileUploads(std::list &hashs) = 0; virtual bool FileDetails(std::string hash, uint32_t hintflags, FileInfo &info) = 0; - /*** * Extra List Access ***/