From 339fcd53edf18067ed9c1e793b98f95149f75a83 Mon Sep 17 00:00:00 2001 From: alexandrut Date: Mon, 10 Aug 2009 19:01:27 +0000 Subject: [PATCH] add persistence to download queue git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@1510 b45a01b8-16f6-495d-af2f-9b41ad6348cc --- libretroshare/src/ft/ftcontroller.cc | 6 + libretroshare/src/ft/ftcontroller.h | 1 + libretroshare/src/ft/ftdwlqueue.cc | 275 ++++++++++++++++++++++++++- libretroshare/src/ft/ftdwlqueue.h | 65 ++++--- libretroshare/src/ft/ftserver.cc | 3 +- libretroshare/src/pqi/p3cfgmgr.h | 17 +- 6 files changed, 331 insertions(+), 36 deletions(-) diff --git a/libretroshare/src/ft/ftcontroller.cc b/libretroshare/src/ft/ftcontroller.cc index 1760fd639..9c0da0c25 100644 --- a/libretroshare/src/ft/ftcontroller.cc +++ b/libretroshare/src/ft/ftcontroller.cc @@ -523,6 +523,12 @@ bool ftController::activate() return true; } +bool ftController::isActiveAndNoPending() +{ + RsStackMutex stack(ctrlMutex); /******* LOCKED ********/ + return (mFtActive && mFtPendingDone); +} + bool ftController::handleAPendingRequest() { ftPendingRequest req; diff --git a/libretroshare/src/ft/ftcontroller.h b/libretroshare/src/ft/ftcontroller.h index 1662aa67e..82c54c331 100644 --- a/libretroshare/src/ft/ftcontroller.h +++ b/libretroshare/src/ft/ftcontroller.h @@ -117,6 +117,7 @@ class ftController: public CacheTransfer, public RsThread, public pqiMonitor, pu void setFtSearchNExtra(ftSearch *, ftExtraList *); void setTurtleRouter(p3turtle *) ; bool activate(); +bool isActiveAndNoPending(); void setShareDownloadDirectory(bool value); bool getShareDownloadDirectory(); diff --git a/libretroshare/src/ft/ftdwlqueue.cc b/libretroshare/src/ft/ftdwlqueue.cc index 2465bbdc7..3a217bb8a 100644 --- a/libretroshare/src/ft/ftdwlqueue.cc +++ b/libretroshare/src/ft/ftdwlqueue.cc @@ -7,13 +7,18 @@ #include "ftdwlqueue.h" #include "ftserver.h" +#include "serialiser/rsbaseserial.h" #include /*#define DEBUG_QUEUE 1*/ +#ifdef DEBUG_QUEUE +#include +#endif + ftDwlQueue::ftDwlQueue(ftController *ftc, unsigned int downloadLimit, unsigned int retryLimit) - : mFtController(ftc), downloadLimit(downloadLimit), retryLimit(retryLimit) { + : p3Config(CONFIG_TYPE_FT_DWLQUEUE), mFtController(ftc), downloadLimit(downloadLimit), retryLimit(retryLimit) { return; } @@ -35,6 +40,11 @@ void ftDwlQueue::run() sleep(1); #endif + /* first wait for files to start resume else + * downloads will not be correctly counted */ + + if (!mFtController->isActiveAndNoPending()) continue; + /* we have to know if the next download from * the queue will exceed the download limit */ @@ -61,6 +71,8 @@ void ftDwlQueue::run() priorities.sort(PriorityCompare()); } prmtx.unlock(); + + IndicateConfigChanged(); } } } @@ -106,11 +118,13 @@ void ftDwlQueue::insertDownload(const DwlDetails & details) { priorities.push_back(_details); priorities.sort(PriorityCompare()); + IndicateConfigChanged(); } } } else { priorities.push_back(_details); priorities.sort(PriorityCompare()); + IndicateConfigChanged(); } } @@ -124,6 +138,7 @@ bool ftDwlQueue::getNext(DwlDetails & details) { std::cerr << "ftDwlQueue::getNext() file: " << details.fname << " priority: " << details.priority << std::endl; #endif + IndicateConfigChanged(); return true; } @@ -159,6 +174,7 @@ bool ftDwlQueue::changePriority(const std::string hash, DwlPriority priority) { std::cerr << "ftDwlQueue::changePriority() file: " << hash << " new priority: " << it->priority << std::endl; #endif + IndicateConfigChanged(); return true; } @@ -196,6 +212,7 @@ bool ftDwlQueue::clearDownload(const std::string hash) { #ifdef DEBUG_QUEUE std::cerr << "ftDwlQueue::clearDownload() file: " << hash << std::endl; #endif + IndicateConfigChanged(); return true; } @@ -249,3 +266,259 @@ unsigned int ftDwlQueue::totalSystemDwl() { return totalDwl; } + +/*********************************************/ +/************ Serialisation ******************/ +/*********************************************/ + +RsDwlQueueItem::~RsDwlQueueItem() { + return; +} + +void RsDwlQueueItem::clear() { + file.TlvClear(); + allPeerIds.TlvClear(); + priority = 0; +} + +std::ostream &RsDwlQueueItem::print(std::ostream &out, uint16_t indent) { + printRsItemBase(out, "RsDwlQueueItem", indent); + + file.print(out, indent + 2); + allPeerIds.print(out, indent + 2); + + printIndent(out, indent + 2); + + switch (priority) { + case 0: + out << "priority: " << "Low" << std::endl; + break; + case 1: + out << "priority: " << "Normal" << std::endl; + break; + case 2: + out << "priority: " << "High" << std::endl; + break; + case 3: + out << "priority: " << "Auto" << std::endl; + break; + default: + out << "priority: " << "Auto" << std::endl; + break; + } + + printRsItemEnd(out, "RsDwlQueueItem", indent); + + return out; +} + +RsDwlQueueSerialiser::~RsDwlQueueSerialiser() { + return; +} + +uint32_t RsDwlQueueSerialiser::size(RsItem *i) { + RsDwlQueueItem *item; + uint32_t s = 0; + + if (NULL != (item = dynamic_cast(i))) { + s = 8; /* header */ + s += item->file.TlvSize(); /* file size */ + s += item->allPeerIds.TlvSize(); /* peers size */ + s += 4; /* priority size */ + } + + return s; +} + +bool RsDwlQueueSerialiser::serialise(RsItem *i, void *data, uint32_t *size) { + RsDwlQueueItem *item = (RsDwlQueueItem *) i; + uint32_t tlvsize = RsDwlQueueSerialiser::size(item); + uint32_t offset = 0; + + if (*size < tlvsize) { + return false; /* not enough space */ + } + + *size = tlvsize; + + bool ok = true; + + ok &= setRsItemHeader(data, tlvsize, item->PacketId(), tlvsize); + +#ifdef RSSERIAL_DEBUG + std::cerr << "RsDwlQueueSerialiser::serialise() Header: " << ok << std::endl; + std::cerr << "RsDwlQueueSerialiser::serialise() Size: " << size << std::endl; +#endif + + /* skip the header */ + offset += 8; + + ok &= item->file.SetTlv(data, tlvsize, &offset); + ok &= item->allPeerIds.SetTlv(data, tlvsize, &offset); + ok &= setRawUInt32(data, tlvsize, &offset, item->priority); + + if (offset !=tlvsize) { + ok = false; +#ifdef RSSERIAL_DEBUG + std::cerr << "RsDwlQueueSerialiser::serialise() Size Error! " << std::endl; +#endif + } + + return ok; +} + +RsItem *RsDwlQueueSerialiser::deserialise(void *data, uint32_t *size) { + /* get the type and size */ + uint32_t rstype = getRsItemId(data); + uint32_t rssize = getRsItemSize(data); + + uint32_t offset = 0; + + if ((RS_PKT_VERSION1 != getRsItemVersion(rstype)) || + (RS_PKT_CLASS_CONFIG != getRsItemClass(rstype)) || + (RS_PKT_TYPE_QUEUE_CONFIG != getRsItemType(rstype)) || + (RS_PKT_SUBTYPE_FILE_ITEM != getRsItemSubType(rstype))) { + return NULL; /* wrong type */ + } + + if (*size < rssize) { + return NULL; /* not enough data */ + } + + /* set the packet length */ + *size = rssize; + + bool ok = true; + + /* ready to load */ + RsDwlQueueItem *item = new RsDwlQueueItem(); + item->clear(); + + /* skip the header */ + offset += 8; + + ok &= item->file.GetTlv(data, rssize, &offset); + ok &= item->allPeerIds.GetTlv(data, rssize, &offset); + ok &= getRawUInt32(data, rssize, &offset, &item->priority); + + if (offset != rssize) { +#ifdef RSSERIAL_DEBUG + std::cerr << "RsDwlQueueSerialiser::deserialise() offset != rssize" << std::endl; +#endif + delete item; + return NULL; + } + + if (!ok) { +#ifdef RSSERIAL_DEBUG + std::cerr << "RsDwlQueueSerialiser::deserialise() ok = false" << std::endl; +#endif + delete item; + return NULL; + } + + return item; +} + +/*********************************************/ +/* p3Config interface methods implementation */ +/*********************************************/ + +RsSerialiser *ftDwlQueue::setupSerialiser() { + RsSerialiser *rss = new RsSerialiser(); + rss->addSerialType(new RsDwlQueueSerialiser()); + + return rss; +} + +std::list ftDwlQueue::saveList(bool &cleanup) { +#ifdef DEBUG_QUEUE + std::cerr << "ftDwlQueue::saveList()" << std::endl; +#endif + RsStackMutex stack(prmtx); + + cleanup = true; + std::list result; + + std::list::iterator it; + for (it = priorities.begin(); it != priorities.end(); it ++) { + RsDwlQueueItem *item = new RsDwlQueueItem(); + + RsTlvFileItem file; + file.filesize = it->count; + file.hash = it->hash; + file.name = it->fname; + file.path = it->dest; + + RsTlvPeerIdSet allPeerIds; + allPeerIds.ids = it->srcIds; + + item->file = file; + item->allPeerIds = allPeerIds; + item->priority = it->priority; + +#ifdef DEBUG_QUEUE + std::list::iterator dit; + std::cerr << "ftDwlQueue::saveList - save item(" + << it->fname << "," + << it->hash << "," + << it->count << "," + << it->dest << "," + << it->flags << ",<"; + + for(dit = it->srcIds.begin(); dit != it->srcIds.end(); dit ++) { + std::cerr << *dit << ","; + } + std::cerr << ">)"; + std::cerr << std::endl; +#endif + result.push_back(item); + } + + return result; +} + +bool ftDwlQueue::loadList(std::list load) { +#ifdef DEBUG_QUEUE + std::cerr << "ftDwlQueue::loadList()" << std::endl; +#endif + RsStackMutex stack(prmtx); + + priorities.clear(); + + std::list::iterator it; + for (it = load.begin(); it != load.end(); it ++) { + RsDwlQueueItem *item = dynamic_cast(*it); +#ifdef DEBUG_QUEUE + assert(item != NULL); +#endif + if (!item) continue; + + DwlDetails details(item->file.name, item->file.hash, item->file.filesize, item->file.path, + 0, item->allPeerIds.ids, (DwlPriority) item->priority); + +#ifdef DEBUG_QUEUE + std::list::iterator dit; + std::cerr << "ftDwlQueue::loadList - load item(" + << details.fname << "," + << details.hash << "," + << details.count << "," + << details.dest << "," + << details.flags << ",<"; + + for(dit = details.srcIds.begin(); dit != details.srcIds.end(); dit ++) { + std::cerr << *dit << ","; + } + std::cerr << ">)"; + std::cerr << std::endl; +#endif + priorities.push_back(details); + + delete item; + } + + /* not sure if necessary, list should be already sorted */ + priorities.sort(PriorityCompare()); + + return true; +} diff --git a/libretroshare/src/ft/ftdwlqueue.h b/libretroshare/src/ft/ftdwlqueue.h index 235eac1b7..d86235d49 100644 --- a/libretroshare/src/ft/ftdwlqueue.h +++ b/libretroshare/src/ft/ftdwlqueue.h @@ -14,31 +14,38 @@ #include #include -//enum DwlPriority { Low = 0, Normal, High, Auto }; -// -///* class which encapsulates download details */ -//class DwlDetails { -//public: -// DwlDetails() { return; } -// DwlDetails(std::string fname, std::string hash, int count, std::string dest, -// uint32_t flags, std::list srcIds, DwlPriority priority) -// : fname(fname), hash(hash), count(count), dest(dest), flags(flags), -// srcIds(srcIds), priority(priority), retries(0) { return; } -// -// /* download details */ -// std::string fname; -// std::string hash; -// int count; -// std::string dest; -// uint32_t flags; -// std::list srcIds; -// -// /* internally used in download queue */ -// DwlPriority priority; -// -// /* how many times a failed dwl will be requeued */ -// unsigned int retries; -//}; +const uint8_t RS_PKT_TYPE_QUEUE_CONFIG = 0x05; + +class RsDwlQueueItem: public RsItem { +public: + RsDwlQueueItem() + : RsItem(RS_PKT_VERSION1, RS_PKT_CLASS_CONFIG, RS_PKT_TYPE_QUEUE_CONFIG, RS_PKT_SUBTYPE_FILE_ITEM) { + return; + } + + virtual ~RsDwlQueueItem(); + + virtual void clear(); + virtual std::ostream &print(std::ostream &out, uint16_t indent = 0); + + RsTlvFileItem file; + RsTlvPeerIdSet allPeerIds; + uint32_t priority; +}; + +class RsDwlQueueSerialiser: public RsSerialType { +public: + RsDwlQueueSerialiser() + : RsSerialType(RS_PKT_VERSION1, RS_PKT_CLASS_CONFIG, RS_PKT_TYPE_QUEUE_CONFIG) { + return; + } + + virtual ~RsDwlQueueSerialiser(); + + virtual uint32_t size(RsItem *); + virtual bool serialise (RsItem *item, void *data, uint32_t *size); + virtual RsItem * deserialise(void *data, uint32_t *size); +}; /* comparator class used when sorting list */ class PriorityCompare { @@ -72,7 +79,7 @@ public: /* general class for download queue which * contains the a download priority list */ -class ftDwlQueue : public DwlQueue, public RsThread { +class ftDwlQueue : public DwlQueue, public RsThread, public p3Config { public: ftDwlQueue(ftController *ftc, unsigned int downloadLimit = 7, unsigned int retryLimit = 10); virtual ~ftDwlQueue(); @@ -90,6 +97,12 @@ public: virtual void clearQueue(); virtual void getDwlDetails(std::list & details); + /* from p3 config interface */ +protected: + virtual RsSerialiser *setupSerialiser(); + virtual std::list saveList(bool &cleanup); + virtual bool loadList(std::list load); + private: unsigned int downloadLimit; unsigned int retryLimit; diff --git a/libretroshare/src/ft/ftserver.cc b/libretroshare/src/ft/ftserver.cc index fd877f912..eaa365c75 100644 --- a/libretroshare/src/ft/ftserver.cc +++ b/libretroshare/src/ft/ftserver.cc @@ -497,7 +497,7 @@ bool ftServer::addSharedDirectory(SharedDirInfo dir) for(std::list::const_iterator it(dirList.begin());it!=dirList.end();++it) if((*it).filename == dir.filename) return false ; - + // ok then, add the shared directory. dirList.push_back(dir); @@ -965,6 +965,7 @@ bool ftServer::addConfiguration(p3ConfigMgr *cfgmgr) cfgmgr->addConfiguration("ft_shared.cfg", mFiMon); cfgmgr->addConfiguration("ft_extra.cfg", mFtExtra); cfgmgr->addConfiguration("ft_transfers.cfg", mFtController); + cfgmgr->addConfiguration("ft_dwlqueue.cfg", mFtDwlQueue); return true; } diff --git a/libretroshare/src/pqi/p3cfgmgr.h b/libretroshare/src/pqi/p3cfgmgr.h index 0c7ef72d7..25a59ee70 100644 --- a/libretroshare/src/pqi/p3cfgmgr.h +++ b/libretroshare/src/pqi/p3cfgmgr.h @@ -47,7 +47,7 @@ * * At top level we need: * - * - type / filename / size / hash - + * - type / filename / size / hash - * and the file signed... * * @@ -68,6 +68,7 @@ const uint32_t CONFIG_TYPE_CACHE_OLDID = 0x0005; const uint32_t CONFIG_TYPE_FT_SHARED = 0x0007; const uint32_t CONFIG_TYPE_FT_EXTRA_LIST= 0x0008; const uint32_t CONFIG_TYPE_FT_CONTROL = 0x0009; +const uint32_t CONFIG_TYPE_FT_DWLQUEUE = 0x000A; /* turtle router */ const uint32_t CONFIG_TYPE_TURTLE = 0x0020; @@ -93,7 +94,7 @@ class p3AuthMgr; class pqiConfig { - public: + public: pqiConfig(uint32_t t); virtual ~pqiConfig(); @@ -107,13 +108,13 @@ std::string Hash(); protected: void IndicateConfigChanged(); -void setHash(std::string h); +void setHash(std::string h); RsMutex cfgMtx; private: -void setFilename(std::string name); +void setFilename(std::string name); bool HasConfigChanged(uint16_t idx); Indicator ConfInd; @@ -122,7 +123,7 @@ bool HasConfigChanged(uint16_t idx); std::string filename; std::string hash; - friend class p3ConfigMgr; + friend class p3ConfigMgr; /* so it can access: * setFilename() and HasConfigChanged() */ @@ -147,7 +148,7 @@ void addConfiguration(std::string file, pqiConfig *conf); /* saves config, and disables further saving * used for exiting the system */ -void completeConfiguration(); +void completeConfiguration(); private: @@ -185,7 +186,7 @@ virtual bool loadList(std::list load) = 0; * callback for mutex unlocking * in derived classes (should only be needed if cleanup = false) */ -virtual void saveDone() { return; } +virtual void saveDone() { return; } }; /* end of p3Config */ @@ -214,7 +215,7 @@ std::map settings; - +