From 80f46861003284a17fc7c644c855b6e09ed1157a Mon Sep 17 00:00:00 2001 From: drbob Date: Tue, 4 Nov 2008 23:12:53 +0000 Subject: [PATCH] Bugfixes and extra debugging for file transfer. git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@791 b45a01b8-16f6-495d-af2f-9b41ad6348cc --- libretroshare/src/dbase/cachestrapper.cc | 19 +- libretroshare/src/dbase/findex.cc | 9 + libretroshare/src/dbase/fistore.cc | 12 + libretroshare/src/ft/ftcontroller.cc | 38 ++- libretroshare/src/ft/ftdatamultiplex.cc | 59 ++++ libretroshare/src/ft/ftserver3test.cc | 78 +---- libretroshare/src/ft/fttransfermodule.cc | 319 ++++++++++-------- libretroshare/src/ft/fttransfermodule.h | 15 +- libretroshare/src/ft/pqitestor.cc | 2 - libretroshare/src/scripts/config.mk | 4 +- libretroshare/src/serialiser/rstlvfileitem.cc | 6 + libretroshare/src/serialiser/rstlvtypes.h | 2 +- 12 files changed, 333 insertions(+), 230 deletions(-) diff --git a/libretroshare/src/dbase/cachestrapper.cc b/libretroshare/src/dbase/cachestrapper.cc index ee740a4f0..48145b66e 100644 --- a/libretroshare/src/dbase/cachestrapper.cc +++ b/libretroshare/src/dbase/cachestrapper.cc @@ -34,6 +34,7 @@ * #define CS_DEBUG 1 ***/ +#define CS_DEBUG 1 bool operator<(const CacheId &a, const CacheId &b) { @@ -209,7 +210,7 @@ CacheStore::CacheStore(uint16_t t, bool m, void CacheStore::lockData() const { #ifdef CS_DEBUG - std::cerr << "CacheStore::lockData()" << std::endl; +// std::cerr << "CacheStore::lockData()" << std::endl; #endif cMutex.lock(); } @@ -217,7 +218,7 @@ void CacheStore::lockData() const void CacheStore::unlockData() const { #ifdef CS_DEBUG - std::cerr << "CacheStore::unlockData()" << std::endl; +// std::cerr << "CacheStore::unlockData()" << std::endl; #endif cMutex.unlock(); } @@ -1050,15 +1051,29 @@ bool CacheTransfer::CompletedCache(std::string hash) std::map::iterator dit; std::map::iterator sit; +#ifdef CS_DEBUG + std::cerr << "CacheTransfer::CompletedCache(" << hash << ")"; + std::cerr << std::endl; +#endif + /* find in store.... */ sit = cbStores.find(hash); dit = cbData.find(hash); if ((sit == cbStores.end()) || (dit == cbData.end())) { +#ifdef CS_DEBUG + std::cerr << "CacheTransfer::CompletedCache() Failed to find it"; + std::cerr << std::endl; +#endif + return false; } +#ifdef CS_DEBUG + std::cerr << "CacheTransfer::CompletedCache() callback to store"; + std::cerr << std::endl; +#endif /* callback */ (sit -> second) -> downloadedCache(dit->second); diff --git a/libretroshare/src/dbase/findex.cc b/libretroshare/src/dbase/findex.cc index 7bac01f47..2c4b74ca0 100644 --- a/libretroshare/src/dbase/findex.cc +++ b/libretroshare/src/dbase/findex.cc @@ -963,6 +963,10 @@ int DirEntry::saveEntry(std::ostringstream &oss) int FileIndex::searchHash(std::string hash, std::list &results) const { +#ifdef FI_DEBUG + std::cerr << "FileIndex::searchHash(" << hash << ")"; + std::cerr << std::endl; +#endif DirEntry *ndir = NULL; std::list dirlist; dirlist.push_back(root); @@ -985,6 +989,11 @@ int FileIndex::searchHash(std::string hash, std::list &results) con if (hash == (fit->second)->hash) { results.push_back(fit->second); +#ifdef FI_DEBUG + std::cerr << "FileIndex::searchHash(" << hash << ")"; + std::cerr << " found: " << fit->second->name; + std::cerr << std::endl; +#endif } } } diff --git a/libretroshare/src/dbase/fistore.cc b/libretroshare/src/dbase/fistore.cc index 84e7d75c8..993fec97f 100644 --- a/libretroshare/src/dbase/fistore.cc +++ b/libretroshare/src/dbase/fistore.cc @@ -44,6 +44,8 @@ FileIndexStore::~FileIndexStore() * #define FIS_DEBUG 1 **/ +#define FIS_DEBUG 1 + /* actual load, once data available */ int FileIndexStore::loadCache(const CacheData &data) { @@ -353,6 +355,10 @@ int FileIndexStore::SearchHash(std::string hash, std::list &results) #endif for(pit = indices.begin(); pit != indices.end(); pit++) { +#ifdef FIS_DEBUG + std::cerr << "FileIndexStore::SearchHash() Searching: Peer "; + std::cerr << pit->first << std::endl; +#endif firesults.clear(); (pit->second)->searchHash(hash, firesults); @@ -373,6 +379,12 @@ int FileIndexStore::SearchHash(std::string hash, std::list &results) } + +#ifdef FIS_DEBUG + std::cerr << "FileIndexStore::SearchHash() Found " << results.size(); + std::cerr << " Results from " << indices.size() << " Peers" << std::endl; +#endif + unlockData(); return results.size(); } diff --git a/libretroshare/src/ft/ftcontroller.cc b/libretroshare/src/ft/ftcontroller.cc index 5fe80b607..2020ce1e8 100644 --- a/libretroshare/src/ft/ftcontroller.cc +++ b/libretroshare/src/ft/ftcontroller.cc @@ -218,23 +218,50 @@ bool ftController::completeFile(std::string hash) /* If it has a callback - do it now */ if (fc->mDoCallback) { +#ifdef CONTROL_DEBUG + std::cerr << "ftController::completeFile() doing Callback"; + std::cerr << std::endl; +#endif + switch (fc->mCallbackCode) { case CB_CODE_CACHE: /* callback */ if (fc->mState == ftFileControl::COMPLETED) { +#ifdef CONTROL_DEBUG + std::cerr << "ftController::completeFile() doing Callback : Success"; + std::cerr << std::endl; +#endif + CompletedCache(fc->mHash); } else { +#ifdef CONTROL_DEBUG + std::cerr << "ftController::completeFile() Cache Callback : Failed"; + std::cerr << std::endl; +#endif FailedCache(fc->mHash); } break; case CB_CODE_MEDIA: +#ifdef CONTROL_DEBUG + std::cerr << "ftController::completeFile() NULL MEDIA callback"; + std::cerr << std::endl; +#endif break; } } + else + { +#ifdef CONTROL_DEBUG + std::cerr << "ftController::completeFile() No callback"; + std::cerr << std::endl; +#endif + + + } /* switch map */ @@ -249,6 +276,8 @@ bool ftController::completeFile(std::string hash) /********************** Controller Access **********************/ /***************************************************************/ +const uint32_t FT_CNTRL_STANDARD_RATE = 100 * 1024; + bool ftController::FileRequest(std::string fname, std::string hash, uint64_t size, std::string dest, uint32_t flags, std::list &srcIds) @@ -378,7 +407,8 @@ bool ftController::FileRequest(std::string fname, std::string hash, #endif //tm->setPeerState(*it, RS_FILE_RATE_FAST | // RS_FILE_PEER_ONLINE, 100000); - tm->setPeerState(*it, PQIPEER_IDLE, 10000); + //tm->setPeerState(*it, PQIPEER_IDLE, 10000); + tm->setPeerState(*it, PQIPEER_IDLE, FT_CNTRL_STANDARD_RATE); } else if (mConnMgr->isOnline(*it)) { @@ -389,7 +419,8 @@ bool ftController::FileRequest(std::string fname, std::string hash, #endif //tm->setPeerState(*it, RS_FILE_RATE_TRICKLE | // RS_FILE_PEER_ONLINE, 10000); - tm->setPeerState(*it, PQIPEER_IDLE, 10000); + //tm->setPeerState(*it, PQIPEER_IDLE, 10000); + tm->setPeerState(*it, PQIPEER_IDLE, FT_CNTRL_STANDARD_RATE); } else { @@ -399,7 +430,7 @@ bool ftController::FileRequest(std::string fname, std::string hash, std::cerr << std::endl; #endif //tm->setPeerState(*it, RS_FILE_PEER_OFFLINE, 10000); - tm->setPeerState(*it, PQIPEER_NOT_ONLINE, 10000); + tm->setPeerState(*it, PQIPEER_NOT_ONLINE, FT_CNTRL_STANDARD_RATE); } } @@ -412,6 +443,7 @@ bool ftController::FileRequest(std::string fname, std::string hash, } + bool ftController::FileCancel(std::string hash) { #ifdef CONTROL_DEBUG diff --git a/libretroshare/src/ft/ftdatamultiplex.cc b/libretroshare/src/ft/ftdatamultiplex.cc index 89fdac348..11b64d9bd 100644 --- a/libretroshare/src/ft/ftdatamultiplex.cc +++ b/libretroshare/src/ft/ftdatamultiplex.cc @@ -44,6 +44,8 @@ const double DMULTIPLEX_RELAX = 0.5; /* ??? */ * #define MPLEX_DEBUG 1 *****/ +#define MPLEX_DEBUG 1 + ftClient::ftClient(ftTransferModule *module, ftFileCreator *creator) :mModule(module), mCreator(creator) { @@ -120,11 +122,24 @@ bool ftDataMultiplex::FileDownloads(std::list &hashs) bool ftDataMultiplex::FileDetails(std::string hash, uint32_t hintsflag, FileInfo &info) { +#ifdef MPLEX_DEBUG + std::cerr << "ftDataMultiplex::FileDetails("; + std::cerr << hash << ", " << hintsflag << ")"; + std::cerr << std::endl; +#endif + RsStackMutex stack(dataMtx); /******* LOCK MUTEX ******/ std::map::iterator sit; sit = mServers.find(hash); if (sit != mServers.end()) { + +#ifdef MPLEX_DEBUG + std::cerr << "ftDataMultiplex::FileDetails()"; + std::cerr << " Found ftFileProvider!"; + std::cerr << std::endl; +#endif + (sit->second)->FileDetails(info); return true; } @@ -132,10 +147,23 @@ bool ftDataMultiplex::FileDetails(std::string hash, uint32_t hintsflag, FileI std::map::iterator cit; if (mClients.end() != (cit = mClients.find(hash))) { + +#ifdef MPLEX_DEBUG + std::cerr << "ftDataMultiplex::FileDetails()"; + std::cerr << " Found ftFileCreator!"; + std::cerr << std::endl; +#endif + //(cit->second).mModule->FileDetails(info); (cit->second).mCreator->FileDetails(info); return true; } + +#ifdef MPLEX_DEBUG + std::cerr << "ftDataMultiplex::FileDetails()"; + std::cerr << " Found nothing"; + std::cerr << std::endl; +#endif return false; } @@ -410,6 +438,7 @@ bool ftDataMultiplex::locked_handleServerRequest(ftFileProvider *provider, std::cerr << " FAILED"; std::cerr << std::endl; #endif + free(data); return false; } @@ -420,6 +449,13 @@ bool ftDataMultiplex::handleSearchRequest(std::string peerId, uint64_t offset, uint32_t chunksize) { + +#ifdef MPLEX_DEBUG + std::cerr << "ftDataMultiplex::handleSearchRequest("; + std::cerr << peerId << ", " << hash << ", " << size << "...)"; + std::cerr << std::endl; +#endif + { RsStackMutex stack(dataMtx); /******* LOCK MUTEX ******/ @@ -427,6 +463,13 @@ bool ftDataMultiplex::handleSearchRequest(std::string peerId, std::map::iterator bit; if (mUnknownHashs.end() != (bit = mUnknownHashs.find(hash))) { + +#ifdef MPLEX_DEBUG + std::cerr << "ftDataMultiplex::handleSearchRequest("; + std::cerr << " Found Ignore Hash ... done"; + std::cerr << std::endl; +#endif + /* We've previously rejected this one, so ignore */ return false; } @@ -439,6 +482,7 @@ bool ftDataMultiplex::handleSearchRequest(std::string peerId, * (anywhere but remote really) */ + FileInfo info; uint32_t hintflags = (RS_FILE_HINTS_CACHE | RS_FILE_HINTS_EXTRA | @@ -448,6 +492,13 @@ bool ftDataMultiplex::handleSearchRequest(std::string peerId, if (mSearch->search(hash, size, hintflags, info)) { +#ifdef MPLEX_DEBUG + std::cerr << "ftDataMultiplex::handleSearchRequest("; + std::cerr << " Found Local File, sharing..."; + std::cerr << std::endl; +#endif + + /* setup a new provider */ RsStackMutex stack(dataMtx); /******* LOCK MUTEX ******/ @@ -459,6 +510,14 @@ bool ftDataMultiplex::handleSearchRequest(std::string peerId, /* handle request finally */ locked_handleServerRequest(provider, peerId, hash, size, offset, chunksize); + + + /* now we should should check if any further requests for the same + * file exists ... (can happen with caches!) + * + * but easier to check pre-search.... + */ + return true; } return false; diff --git a/libretroshare/src/ft/ftserver3test.cc b/libretroshare/src/ft/ftserver3test.cc index 7f1948790..045da5976 100644 --- a/libretroshare/src/ft/ftserver3test.cc +++ b/libretroshare/src/ft/ftserver3test.cc @@ -352,6 +352,7 @@ INITTEST(); void *do_server_test_thread(void *data) { TestData *mFt = (TestData *) data; + time_t startTS = time(NULL); std::cerr << "do_server_test_thread() running"; std::cerr << std::endl; @@ -368,9 +369,10 @@ void *do_server_test_thread(void *data) } - for(int i = 0; i < 60; i++) + for(int i = 0; i < 90; i++) { - std::cerr << "Waiting 60 seconds to share caches"; + int age = time(NULL) - startTS; + std::cerr << "Waited " << age << " seconds to share caches"; std::cerr << std::endl; sleep(1); } @@ -436,7 +438,8 @@ void *do_server_test_thread(void *data) /*** Now Download it! ***/ std::list srcIds; //srcIds.push_back(sFile.id); - srcIds.push_back(oId); + // Don't add srcId - to test whether the search works - or not + //srcIds.push_back(oId); if (foundFile) { mFt->loadServer->FileRequest(sFile.name, sFile.hash, @@ -444,75 +447,14 @@ void *do_server_test_thread(void *data) } /* Give it a while to transfer */ - for(int i = 0; i < 300; i++) + for(int i = 0; i < 100; i++) { - std::cerr << "Waited " << i * 10 << " seconds for transfer"; + int age = time(NULL) - startTS; + std::cerr << "Waited " << age << " seconds for tranfer"; std::cerr << std::endl; - sleep(10); + sleep(1); } -#if 0 - bool - while(!mFt->loadServer->ExtraFileStatus(*eit, info)) - { - - /* max of 30 seconds */ - now = time(NULL); - if (now - start > 30) - { - /* FAIL */ - REPORT2( false, "Extra File Hashing"); - } - - sleep(1); - } - - /* Got ExtraFileStatus */ - REPORT("Successfully Found ExtraFile"); - - /* now we can try a search (should succeed) */ - uint32_t hintflags = 0; - if (mFt->loadServer->FileDetails(info.hash, hintflags, info2)) - { - CHECK(info2.hash == info.hash); - CHECK(info2.size == info.size); - CHECK(info2.fname == info.fname); - } - else - { - REPORT2( false, "Search for Extra File (Basic)"); - } - - /* search with flags (should succeed) */ - hintflags = RS_FILE_HINTS_EXTRA; - if (mFt->loadServer->FileDetails(info.hash, hintflags, info2)) - { - CHECK(info2.hash == info.hash); - CHECK(info2.size == info.size); - CHECK(info2.fname == info.fname); - } - else - { - REPORT2( false, "Search for Extra File (Extra Flag)"); - } - - /* search with other flags (should fail) */ - hintflags = RS_FILE_HINTS_REMOTE | RS_FILE_HINTS_SPEC_ONLY; - if (mFt->loadServer->FileDetails(info.hash, hintflags, info2)) - { - REPORT2( false, "Search for Extra File (Fail Flags)"); - } - else - { - REPORT("Search for Extra File (Fail Flags)"); - } - - /* if we try to download it ... should just find existing one*/ - - REPORT("Testing with Extra File"); - } -#endif - FINALREPORT("Shared Directories, Bool Search, multi-source transfers"); exit(1); } diff --git a/libretroshare/src/ft/fttransfermodule.cc b/libretroshare/src/ft/fttransfermodule.cc index b5fed097f..7b8c54d1c 100644 --- a/libretroshare/src/ft/fttransfermodule.cc +++ b/libretroshare/src/ft/fttransfermodule.cc @@ -31,6 +31,24 @@ #include "fttransfermodule.h" +/************************************************************************* + * Notes on file transfer strategy. + * Care must be taken not to overload pipe. best way is to time requests. + * and according adjust data rate. + * + * each peer gets a 'max_rate' which is decided on the type of transfer. + * - trickle ... + * - stream ... + * - max ... + * + * Each peer is independently managed. + * + * via the functions: + * + */ + +const double FT_TM_MAX_PEER_RATE = 1024 * 1024; /* 1MB/s */ + ftTransferModule::ftTransferModule(ftFileCreator *fc, ftDataMultiplex *dm, ftController *c) :mFileCreator(fc), mMultiplexor(dm), mFtController(c), mFlag(0) { @@ -43,6 +61,8 @@ ftTransferModule::ftTransferModule(ftFileCreator *fc, ftDataMultiplex *dm, ftCon // Dummy for Testing (should be handled independantly for // each peer. //mChunkSize = 10000; + desiredRate = 1000000; /* 1MB/s ??? */ + actualRate = 0; return; } @@ -108,10 +128,10 @@ bool ftTransferModule::setPeerState(std::string peerId,uint32_t state,uint32_t m (mit->second).state=state; (mit->second).desiredRate=maxRate; + (mit->second).actualRate=maxRate; /* should give big kick in right direction */ std::list::iterator it; - it=mOnlinePeers.begin(); - while((it!=mOnlinePeers.end())&&(*it!=peerId)) it++; + it = std::find(mOnlinePeers.begin(), mOnlinePeers.end(), peerId); if (state!=PQIPEER_NOT_ONLINE) { @@ -180,48 +200,30 @@ bool ftTransferModule::recvFileData(std::string peerId, uint64_t offset, std::cerr << std::endl; #endif + bool ok = false; + { RsStackMutex stack(tfMtx); /******* STACK LOCKED ******/ - std::map::iterator mit; - mit = mFileSources.find(peerId); + std::map::iterator mit; + mit = mFileSources.find(peerId); - if (mit == mFileSources.end()) - { + if (mit == mFileSources.end()) + { #ifdef FT_DEBUG - std::cerr << "ftTransferModule::recvFileData()"; - std::cerr << " peer not found in sources"; - std::cerr << std::endl; + std::cerr << "ftTransferModule::recvFileData()"; + std::cerr << " peer not found in sources"; + std::cerr << std::endl; #endif - return false; - } + return false; + } + ok = locked_recvPeerData(mit->second, offset, chunk_size, data); - if ((mit->second).state != PQIPEER_DOWNLOADING) - { -#ifdef FT_DEBUG - std::cerr << "ftTransferModule::recvFileData()"; - std::cerr << " peer not downloading???"; - std::cerr << std::endl; -#endif - //return false; - } - - if (offset != ((mit->second).offset + (mit->second).receivedSize)) - { - //fix me - //received data not expected -#ifdef FT_DEBUG - std::cerr << "ftTransferModule::recvFileData()"; - std::cerr << " offset != offset + recvdSize"; - std::cerr << std::endl; -#endif - return false; - } - - (mit->second).receivedSize += chunk_size; - (mit->second).state = PQIPEER_IDLE; } /***** STACK MUTEX END ****/ - return storeData(offset, chunk_size, data); + + if (ok) + storeData(offset, chunk_size, data); + return ok; } void ftTransferModule::requestData(std::string peerId, uint64_t offset, uint32_t chunk_size) @@ -298,96 +300,22 @@ bool ftTransferModule::queryInactive() std::cerr << "ftTransferModule::queryInactive()" << std::endl; #endif - if (mFileStatus.stat == ftFileStatus::PQIFILE_INIT) - mFileStatus.stat = ftFileStatus::PQIFILE_DOWNLOADING; + if (mFileStatus.stat == ftFileStatus::PQIFILE_INIT) + mFileStatus.stat = ftFileStatus::PQIFILE_DOWNLOADING; - if (mFileStatus.stat != ftFileStatus::PQIFILE_DOWNLOADING) - { - if (mFileStatus.stat == ftFileStatus::PQIFILE_FAIL_CANCEL) - mFlag = 2; //file canceled by user - return false; - } - - int ts = time(NULL); - uint64_t req_offset; - uint32_t req_size; - int delta; - - std::map::iterator mit; - for(mit = mFileSources.begin(); mit != mFileSources.end(); mit++) - { - std::string peerId = mit->first; - peerInfo* pInfo = &mit->second; - switch (pInfo->state) - { - //Peer side has change from online to offline during transfer - case PQIPEER_NOT_ONLINE: - break; - - //file request has been sent to peer side, but no response received yet - case PQIPEER_DOWNLOADING: - if (ts - (pInfo->lastTS) < PQIPEER_DOWNLOAD_CHECK) - { - /* if not timed out yet.... ignore */ - actualRate += pInfo->actualRate; - break; + if (mFileStatus.stat != ftFileStatus::PQIFILE_DOWNLOADING) + { + if (mFileStatus.stat == ftFileStatus::PQIFILE_FAIL_CANCEL) + mFlag = 2; //file canceled by user + return false; } - /* otherwise fall through to request it again (with getChunk); - */ - - //file response received or peer side is just ready for download - case PQIPEER_IDLE: - pInfo->actualRate = pInfo->chunkSize/(ts-(pInfo->lastTS)); - - if (pInfo->actualRate < pInfo->desiredRate) - { - if (pInfo->actualRate < pInfo->desiredRate/2) - { - req_size = pInfo->chunkSize * 2 ; - } - else - { - req_size = (uint32_t ) (pInfo->chunkSize * 1.1) ; - } - } - else - { - req_size = (uint32_t ) (pInfo->chunkSize * 0.9) ; - } - - if (getChunk(req_offset,req_size)) - { - if (req_size > 0) - { - pInfo->offset = req_offset; - pInfo->chunkSize = req_size; - pInfo->lastTS = ts; - pInfo->state = PQIPEER_DOWNLOADING; - pInfo->receivedSize = 0; - requestData(peerId,req_offset,req_size); - } - else - { - std::cerr << "transfermodule::Waiting for data to be available"; - std::cerr << std::endl; - } - } - else mFlag = 1; - - actualRate += pInfo->actualRate; - break; - - //file transfer has been stopped - case PQIPEER_SUSPEND: - break; - - default: - break; - }//switch - }//for - - return true; + std::map::iterator mit; + for(mit = mFileSources.begin(); mit != mFileSources.end(); mit++) + { + locked_tickPeerTransfer(mit->second); + } + return true; } bool ftTransferModule::pauseTransfer() @@ -442,11 +370,28 @@ bool ftTransferModule::completeFileTransfer() int ftTransferModule::tick() { #ifdef FT_DEBUG + { + RsStackMutex stack(tfMtx); /******* STACK LOCKED ******/ + std::cerr << "ftTransferModule::tick()"; std::cerr << " mFlag: " << mFlag; + std::cerr << " mHash: " << mHash; + std::cerr << " mSize: " << mSize; std::cerr << std::endl; + + std::cerr << "Peers: "; + std::map::iterator it; + for(it = mFileSources.begin(); it != mFileSources.end(); it++) + { + std::cerr << " " << it->first; + } + std::cerr << std::endl; + + + } #endif + queryInactive(); uint32_t flags = 0; @@ -480,43 +425,119 @@ void ftTransferModule::adjustSpeed() std::map::iterator mit; -#ifdef FT_DEBUG - std::cerr << "ftTransferModule::adjustSpeed()"; - std::cerr << " Initial Desired Rate: " << desiredRate << " Actual Rate: " << actualRate; - std::cerr << std::endl; -#endif - - + actualRate = 0; for(mit = mFileSources.begin(); mit != mFileSources.end(); mit++) { - if (((mit->second).state == PQIPEER_DOWNLOADING) - || ((mit->second).state == PQIPEER_IDLE)) - { - #ifdef FT_DEBUG std::cerr << "ftTransferModule::adjustSpeed()"; - std::cerr << "\t" << mit->first << " Desired Rate: " << desiredRate << " Actual Rate: " << actualRate; + std::cerr << "Peer: " << mit->first; + std::cerr << " Desired Rate: " << (mit->second).desiredRate; + std::cerr << " Actual Rate: " << (mit->second).actualRate; std::cerr << std::endl; #endif - - if ((actualRate < desiredRate) && ((mit->second).actualRate >= (mit->second).desiredRate)) - { - (mit->second).desiredRate *= 1.1; - } - - if ((actualRate > desiredRate) && ((mit->second).actualRate < (mit->second).desiredRate)) - { - (mit->second).desiredRate *= 0.9; - } - } + actualRate += mit->second.actualRate; } + #ifdef FT_DEBUG - std::cerr << "ftTransferModule::adjustSpeed()"; - std::cerr << " Initial Desired Rate: " << desiredRate << " Actual Rate: " << actualRate; + std::cerr << "ftTransferModule::adjustSpeed() Totals:"; + std::cerr << "Desired Rate: " << desiredRate << " Actual Rate: " << actualRate; std::cerr << std::endl; #endif - return; + + return; } +/******************************************************************************* + * Actual Peer Transfer Management Code. + * + * request very tick, at rate + * + * + **/ + +const uint32_t FT_TM_MINIMUM_CHUNK = 1024; /* ie 1Kb / sec */ +const uint32_t FT_TM_RESTART_DOWNLOAD = 60; /* 60 seconds */ +const uint32_t FT_TM_DOWNLOAD_TIMEOUT = 5; /* 5 seconds */ + +bool ftTransferModule::locked_tickPeerTransfer(peerInfo &info) +{ + /* how long has it been? */ + time_t ts = time(NULL); + + int ageRecv = ts - info.recvTS; + int ageReq = ts - info.lastTS; + + if (ageReq > FT_TM_RESTART_DOWNLOAD) + { + info.state = PQIPEER_DOWNLOADING; + info.recvTS = ts; /* reset to activate */ + ageRecv = 0; + } + + if (ageRecv > FT_TM_DOWNLOAD_TIMEOUT) + { + info.state = PQIPEER_IDLE; + return false; + } + + /* update rate */ + info.actualRate = info.actualRate * 0.75 + 0.25 * info.lastTransfers; + info.lastTransfers = 0; + + /* request at 10% more than actual rate */ + uint32_t next_req = info.actualRate * 1.1; + + if (next_req > info.desiredRate * 1.1) + next_req = info.desiredRate * 1.1; + + if (next_req > FT_TM_MAX_PEER_RATE) + next_req = FT_TM_MAX_PEER_RATE; + + if (next_req < FT_TM_MINIMUM_CHUNK) + next_req = FT_TM_MINIMUM_CHUNK; + + info.lastTS = ts; + + /* do request */ + uint64_t req_offset = 0; + if (getChunk(req_offset,next_req)) + { + if (next_req > 0) + { + info.state = PQIPEER_DOWNLOADING; + requestData(info.peerId,req_offset,next_req); + } + else + { + std::cerr << "transfermodule::Waiting for available data"; + std::cerr << std::endl; + } + } + else mFlag = 1; +} + + + + //interface to client module +bool ftTransferModule::locked_recvPeerData(peerInfo &info, uint64_t offset, + uint32_t chunk_size, void *data) +{ +#ifdef FT_DEBUG + std::cerr << "ftTransferModule::locked_recvPeerData()"; + std::cerr << " peerId: " << info.peerId; + std::cerr << " offset: " << offset; + std::cerr << " chunksize: " << chunk_size; + std::cerr << " data: " << data; + std::cerr << std::endl; +#endif + + time_t ts = time(NULL); + info.recvTS = ts; + info.state = PQIPEER_DOWNLOADING; + info.lastTransfers += chunk_size; + + return true; +} + diff --git a/libretroshare/src/ft/fttransfermodule.h b/libretroshare/src/ft/fttransfermodule.h index 43b166c00..dacc8fb20 100644 --- a/libretroshare/src/ft/fttransfermodule.h +++ b/libretroshare/src/ft/fttransfermodule.h @@ -72,13 +72,15 @@ class peerInfo { public: peerInfo(std::string peerId_in):peerId(peerId_in),state(PQIPEER_NOT_ONLINE),desiredRate(0),actualRate(0), - offset(0),chunkSize(TRANSFER_START_MIN),receivedSize(0),lastTS(0) + offset(0),chunkSize(TRANSFER_START_MIN),receivedSize(0),lastTS(0), + recvTS(0), lastTransfers(0) { return; } peerInfo(std::string peerId_in,uint32_t state_in,uint32_t maxRate_in): peerId(peerId_in),state(state_in),desiredRate(maxRate_in),actualRate(0), - offset(0),chunkSize(TRANSFER_START_MIN),receivedSize(0),lastTS(0) + offset(0),chunkSize(TRANSFER_START_MIN),receivedSize(0),lastTS(0), + recvTS(0), lastTransfers(0) { return; } @@ -94,7 +96,9 @@ public: //already received data size for current request uint32_t receivedSize; - time_t lastTS; + time_t lastTS; /* last Request */ + time_t recvTS; /* last Recv */ + uint32_t lastTransfers; /* data recvd in last second */ }; class ftFileStatus @@ -160,6 +164,11 @@ public: private: + bool locked_tickPeerTransfer(peerInfo &info); + bool locked_recvPeerData(peerInfo &info, uint64_t offset, + uint32_t chunk_size, void *data); + + /* These have independent Mutexes / are const locally (no Mutex protection)*/ ftFileCreator *mFileCreator; ftDataMultiplex *mMultiplexor; diff --git a/libretroshare/src/ft/pqitestor.cc b/libretroshare/src/ft/pqitestor.cc index f28489fd5..85756c40b 100644 --- a/libretroshare/src/ft/pqitestor.cc +++ b/libretroshare/src/ft/pqitestor.cc @@ -30,8 +30,6 @@ *#define HUB_DEBUG 1 *****/ -#define HUB_DEBUG 1 - P3Hub::P3Hub(uint32_t flags, RsSerialiser *rss) :mSerialiser(rss), mUseSerialiser(false) { diff --git a/libretroshare/src/scripts/config.mk b/libretroshare/src/scripts/config.mk index e7d11205f..9f22fa363 100644 --- a/libretroshare/src/scripts/config.mk +++ b/libretroshare/src/scripts/config.mk @@ -4,8 +4,8 @@ ########################################################################### #Define OS. # -#OS = Linux -OS = MacOSX +OS = Linux +#OS = MacOSX #OS = Cygwin #OS = Win # MinGw. ########################################################################### diff --git a/libretroshare/src/serialiser/rstlvfileitem.cc b/libretroshare/src/serialiser/rstlvfileitem.cc index 8bcc4adf0..90a347ffc 100644 --- a/libretroshare/src/serialiser/rstlvfileitem.cc +++ b/libretroshare/src/serialiser/rstlvfileitem.cc @@ -34,6 +34,12 @@ * #define TLV_FI_DEBUG 1 **/ + +RsTlvFileItem::RsTlvFileItem() +{ + TlvClear(); +} + void RsTlvFileItem::TlvClear() { filesize = 0; diff --git a/libretroshare/src/serialiser/rstlvtypes.h b/libretroshare/src/serialiser/rstlvtypes.h index c1337b2ab..c66674fec 100644 --- a/libretroshare/src/serialiser/rstlvtypes.h +++ b/libretroshare/src/serialiser/rstlvtypes.h @@ -87,7 +87,7 @@ bool setBinData(void *data, uint16_t size); class RsTlvFileItem: public RsTlvItem { public: - RsTlvFileItem() { return; } + RsTlvFileItem(); virtual ~RsTlvFileItem() { return; } virtual uint16_t TlvSize(); virtual void TlvClear();