mirror of
https://github.com/RetroShare/RetroShare.git
synced 2024-12-23 22:49:37 -05:00
Bugfixes and extra debugging for file transfer.
git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@791 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
parent
6343de176e
commit
80f4686100
@ -34,6 +34,7 @@
|
||||
* #define CS_DEBUG 1
|
||||
***/
|
||||
|
||||
#define CS_DEBUG 1
|
||||
|
||||
bool operator<(const CacheId &a, const CacheId &b)
|
||||
{
|
||||
@ -209,7 +210,7 @@ CacheStore::CacheStore(uint16_t t, bool m,
|
||||
void CacheStore::lockData() const
|
||||
{
|
||||
#ifdef CS_DEBUG
|
||||
std::cerr << "CacheStore::lockData()" << std::endl;
|
||||
// std::cerr << "CacheStore::lockData()" << std::endl;
|
||||
#endif
|
||||
cMutex.lock();
|
||||
}
|
||||
@ -217,7 +218,7 @@ void CacheStore::lockData() const
|
||||
void CacheStore::unlockData() const
|
||||
{
|
||||
#ifdef CS_DEBUG
|
||||
std::cerr << "CacheStore::unlockData()" << std::endl;
|
||||
// std::cerr << "CacheStore::unlockData()" << std::endl;
|
||||
#endif
|
||||
cMutex.unlock();
|
||||
}
|
||||
@ -1050,15 +1051,29 @@ bool CacheTransfer::CompletedCache(std::string hash)
|
||||
std::map<std::string, CacheData>::iterator dit;
|
||||
std::map<std::string, CacheStore *>::iterator sit;
|
||||
|
||||
#ifdef CS_DEBUG
|
||||
std::cerr << "CacheTransfer::CompletedCache(" << hash << ")";
|
||||
std::cerr << std::endl;
|
||||
#endif
|
||||
|
||||
/* find in store.... */
|
||||
sit = cbStores.find(hash);
|
||||
dit = cbData.find(hash);
|
||||
|
||||
if ((sit == cbStores.end()) || (dit == cbData.end()))
|
||||
{
|
||||
#ifdef CS_DEBUG
|
||||
std::cerr << "CacheTransfer::CompletedCache() Failed to find it";
|
||||
std::cerr << std::endl;
|
||||
#endif
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
#ifdef CS_DEBUG
|
||||
std::cerr << "CacheTransfer::CompletedCache() callback to store";
|
||||
std::cerr << std::endl;
|
||||
#endif
|
||||
/* callback */
|
||||
(sit -> second) -> downloadedCache(dit->second);
|
||||
|
||||
|
@ -963,6 +963,10 @@ int DirEntry::saveEntry(std::ostringstream &oss)
|
||||
|
||||
int FileIndex::searchHash(std::string hash, std::list<FileEntry *> &results) const
|
||||
{
|
||||
#ifdef FI_DEBUG
|
||||
std::cerr << "FileIndex::searchHash(" << hash << ")";
|
||||
std::cerr << std::endl;
|
||||
#endif
|
||||
DirEntry *ndir = NULL;
|
||||
std::list<DirEntry *> dirlist;
|
||||
dirlist.push_back(root);
|
||||
@ -985,6 +989,11 @@ int FileIndex::searchHash(std::string hash, std::list<FileEntry *> &results) con
|
||||
if (hash == (fit->second)->hash)
|
||||
{
|
||||
results.push_back(fit->second);
|
||||
#ifdef FI_DEBUG
|
||||
std::cerr << "FileIndex::searchHash(" << hash << ")";
|
||||
std::cerr << " found: " << fit->second->name;
|
||||
std::cerr << std::endl;
|
||||
#endif
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -44,6 +44,8 @@ FileIndexStore::~FileIndexStore()
|
||||
* #define FIS_DEBUG 1
|
||||
**/
|
||||
|
||||
#define FIS_DEBUG 1
|
||||
|
||||
/* actual load, once data available */
|
||||
int FileIndexStore::loadCache(const CacheData &data)
|
||||
{
|
||||
@ -353,6 +355,10 @@ int FileIndexStore::SearchHash(std::string hash, std::list<FileDetail> &results)
|
||||
#endif
|
||||
for(pit = indices.begin(); pit != indices.end(); pit++)
|
||||
{
|
||||
#ifdef FIS_DEBUG
|
||||
std::cerr << "FileIndexStore::SearchHash() Searching: Peer ";
|
||||
std::cerr << pit->first << std::endl;
|
||||
#endif
|
||||
firesults.clear();
|
||||
|
||||
(pit->second)->searchHash(hash, firesults);
|
||||
@ -373,6 +379,12 @@ int FileIndexStore::SearchHash(std::string hash, std::list<FileDetail> &results)
|
||||
|
||||
}
|
||||
|
||||
|
||||
#ifdef FIS_DEBUG
|
||||
std::cerr << "FileIndexStore::SearchHash() Found " << results.size();
|
||||
std::cerr << " Results from " << indices.size() << " Peers" << std::endl;
|
||||
#endif
|
||||
|
||||
unlockData();
|
||||
return results.size();
|
||||
}
|
||||
|
@ -218,23 +218,50 @@ bool ftController::completeFile(std::string hash)
|
||||
/* If it has a callback - do it now */
|
||||
if (fc->mDoCallback)
|
||||
{
|
||||
#ifdef CONTROL_DEBUG
|
||||
std::cerr << "ftController::completeFile() doing Callback";
|
||||
std::cerr << std::endl;
|
||||
#endif
|
||||
|
||||
switch (fc->mCallbackCode)
|
||||
{
|
||||
case CB_CODE_CACHE:
|
||||
/* callback */
|
||||
if (fc->mState == ftFileControl::COMPLETED)
|
||||
{
|
||||
#ifdef CONTROL_DEBUG
|
||||
std::cerr << "ftController::completeFile() doing Callback : Success";
|
||||
std::cerr << std::endl;
|
||||
#endif
|
||||
|
||||
CompletedCache(fc->mHash);
|
||||
}
|
||||
else
|
||||
{
|
||||
#ifdef CONTROL_DEBUG
|
||||
std::cerr << "ftController::completeFile() Cache Callback : Failed";
|
||||
std::cerr << std::endl;
|
||||
#endif
|
||||
FailedCache(fc->mHash);
|
||||
}
|
||||
break;
|
||||
case CB_CODE_MEDIA:
|
||||
#ifdef CONTROL_DEBUG
|
||||
std::cerr << "ftController::completeFile() NULL MEDIA callback";
|
||||
std::cerr << std::endl;
|
||||
#endif
|
||||
break;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
#ifdef CONTROL_DEBUG
|
||||
std::cerr << "ftController::completeFile() No callback";
|
||||
std::cerr << std::endl;
|
||||
#endif
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
/* switch map */
|
||||
@ -249,6 +276,8 @@ bool ftController::completeFile(std::string hash)
|
||||
/********************** Controller Access **********************/
|
||||
/***************************************************************/
|
||||
|
||||
const uint32_t FT_CNTRL_STANDARD_RATE = 100 * 1024;
|
||||
|
||||
bool ftController::FileRequest(std::string fname, std::string hash,
|
||||
uint64_t size, std::string dest, uint32_t flags,
|
||||
std::list<std::string> &srcIds)
|
||||
@ -378,7 +407,8 @@ bool ftController::FileRequest(std::string fname, std::string hash,
|
||||
#endif
|
||||
//tm->setPeerState(*it, RS_FILE_RATE_FAST |
|
||||
// RS_FILE_PEER_ONLINE, 100000);
|
||||
tm->setPeerState(*it, PQIPEER_IDLE, 10000);
|
||||
//tm->setPeerState(*it, PQIPEER_IDLE, 10000);
|
||||
tm->setPeerState(*it, PQIPEER_IDLE, FT_CNTRL_STANDARD_RATE);
|
||||
}
|
||||
else if (mConnMgr->isOnline(*it))
|
||||
{
|
||||
@ -389,7 +419,8 @@ bool ftController::FileRequest(std::string fname, std::string hash,
|
||||
#endif
|
||||
//tm->setPeerState(*it, RS_FILE_RATE_TRICKLE |
|
||||
// RS_FILE_PEER_ONLINE, 10000);
|
||||
tm->setPeerState(*it, PQIPEER_IDLE, 10000);
|
||||
//tm->setPeerState(*it, PQIPEER_IDLE, 10000);
|
||||
tm->setPeerState(*it, PQIPEER_IDLE, FT_CNTRL_STANDARD_RATE);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -399,7 +430,7 @@ bool ftController::FileRequest(std::string fname, std::string hash,
|
||||
std::cerr << std::endl;
|
||||
#endif
|
||||
//tm->setPeerState(*it, RS_FILE_PEER_OFFLINE, 10000);
|
||||
tm->setPeerState(*it, PQIPEER_NOT_ONLINE, 10000);
|
||||
tm->setPeerState(*it, PQIPEER_NOT_ONLINE, FT_CNTRL_STANDARD_RATE);
|
||||
}
|
||||
}
|
||||
|
||||
@ -412,6 +443,7 @@ bool ftController::FileRequest(std::string fname, std::string hash,
|
||||
}
|
||||
|
||||
|
||||
|
||||
bool ftController::FileCancel(std::string hash)
|
||||
{
|
||||
#ifdef CONTROL_DEBUG
|
||||
|
@ -44,6 +44,8 @@ const double DMULTIPLEX_RELAX = 0.5; /* ??? */
|
||||
* #define MPLEX_DEBUG 1
|
||||
*****/
|
||||
|
||||
#define MPLEX_DEBUG 1
|
||||
|
||||
ftClient::ftClient(ftTransferModule *module, ftFileCreator *creator)
|
||||
:mModule(module), mCreator(creator)
|
||||
{
|
||||
@ -120,11 +122,24 @@ bool ftDataMultiplex::FileDownloads(std::list<std::string> &hashs)
|
||||
|
||||
bool ftDataMultiplex::FileDetails(std::string hash, uint32_t hintsflag, FileInfo &info)
|
||||
{
|
||||
#ifdef MPLEX_DEBUG
|
||||
std::cerr << "ftDataMultiplex::FileDetails(";
|
||||
std::cerr << hash << ", " << hintsflag << ")";
|
||||
std::cerr << std::endl;
|
||||
#endif
|
||||
|
||||
RsStackMutex stack(dataMtx); /******* LOCK MUTEX ******/
|
||||
std::map<std::string, ftFileProvider *>::iterator sit;
|
||||
sit = mServers.find(hash);
|
||||
if (sit != mServers.end())
|
||||
{
|
||||
|
||||
#ifdef MPLEX_DEBUG
|
||||
std::cerr << "ftDataMultiplex::FileDetails()";
|
||||
std::cerr << " Found ftFileProvider!";
|
||||
std::cerr << std::endl;
|
||||
#endif
|
||||
|
||||
(sit->second)->FileDetails(info);
|
||||
return true;
|
||||
}
|
||||
@ -132,11 +147,24 @@ bool ftDataMultiplex::FileDetails(std::string hash, uint32_t hintsflag, FileI
|
||||
std::map<std::string, ftClient>::iterator cit;
|
||||
if (mClients.end() != (cit = mClients.find(hash)))
|
||||
{
|
||||
|
||||
#ifdef MPLEX_DEBUG
|
||||
std::cerr << "ftDataMultiplex::FileDetails()";
|
||||
std::cerr << " Found ftFileCreator!";
|
||||
std::cerr << std::endl;
|
||||
#endif
|
||||
|
||||
//(cit->second).mModule->FileDetails(info);
|
||||
(cit->second).mCreator->FileDetails(info);
|
||||
return true;
|
||||
}
|
||||
|
||||
#ifdef MPLEX_DEBUG
|
||||
std::cerr << "ftDataMultiplex::FileDetails()";
|
||||
std::cerr << " Found nothing";
|
||||
std::cerr << std::endl;
|
||||
#endif
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -410,6 +438,7 @@ bool ftDataMultiplex::locked_handleServerRequest(ftFileProvider *provider,
|
||||
std::cerr << " FAILED";
|
||||
std::cerr << std::endl;
|
||||
#endif
|
||||
free(data);
|
||||
|
||||
return false;
|
||||
}
|
||||
@ -420,6 +449,13 @@ bool ftDataMultiplex::handleSearchRequest(std::string peerId,
|
||||
uint64_t offset, uint32_t chunksize)
|
||||
{
|
||||
|
||||
|
||||
#ifdef MPLEX_DEBUG
|
||||
std::cerr << "ftDataMultiplex::handleSearchRequest(";
|
||||
std::cerr << peerId << ", " << hash << ", " << size << "...)";
|
||||
std::cerr << std::endl;
|
||||
#endif
|
||||
|
||||
{
|
||||
RsStackMutex stack(dataMtx); /******* LOCK MUTEX ******/
|
||||
|
||||
@ -427,6 +463,13 @@ bool ftDataMultiplex::handleSearchRequest(std::string peerId,
|
||||
std::map<std::string, time_t>::iterator bit;
|
||||
if (mUnknownHashs.end() != (bit = mUnknownHashs.find(hash)))
|
||||
{
|
||||
|
||||
#ifdef MPLEX_DEBUG
|
||||
std::cerr << "ftDataMultiplex::handleSearchRequest(";
|
||||
std::cerr << " Found Ignore Hash ... done";
|
||||
std::cerr << std::endl;
|
||||
#endif
|
||||
|
||||
/* We've previously rejected this one, so ignore */
|
||||
return false;
|
||||
}
|
||||
@ -439,6 +482,7 @@ bool ftDataMultiplex::handleSearchRequest(std::string peerId,
|
||||
* (anywhere but remote really)
|
||||
*/
|
||||
|
||||
|
||||
FileInfo info;
|
||||
uint32_t hintflags = (RS_FILE_HINTS_CACHE |
|
||||
RS_FILE_HINTS_EXTRA |
|
||||
@ -448,6 +492,13 @@ bool ftDataMultiplex::handleSearchRequest(std::string peerId,
|
||||
if (mSearch->search(hash, size, hintflags, info))
|
||||
{
|
||||
|
||||
#ifdef MPLEX_DEBUG
|
||||
std::cerr << "ftDataMultiplex::handleSearchRequest(";
|
||||
std::cerr << " Found Local File, sharing...";
|
||||
std::cerr << std::endl;
|
||||
#endif
|
||||
|
||||
|
||||
/* setup a new provider */
|
||||
RsStackMutex stack(dataMtx); /******* LOCK MUTEX ******/
|
||||
|
||||
@ -459,6 +510,14 @@ bool ftDataMultiplex::handleSearchRequest(std::string peerId,
|
||||
/* handle request finally */
|
||||
locked_handleServerRequest(provider,
|
||||
peerId, hash, size, offset, chunksize);
|
||||
|
||||
|
||||
/* now we should should check if any further requests for the same
|
||||
* file exists ... (can happen with caches!)
|
||||
*
|
||||
* but easier to check pre-search....
|
||||
*/
|
||||
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
|
@ -352,6 +352,7 @@ INITTEST();
|
||||
void *do_server_test_thread(void *data)
|
||||
{
|
||||
TestData *mFt = (TestData *) data;
|
||||
time_t startTS = time(NULL);
|
||||
|
||||
std::cerr << "do_server_test_thread() running";
|
||||
std::cerr << std::endl;
|
||||
@ -368,9 +369,10 @@ void *do_server_test_thread(void *data)
|
||||
}
|
||||
|
||||
|
||||
for(int i = 0; i < 60; i++)
|
||||
for(int i = 0; i < 90; i++)
|
||||
{
|
||||
std::cerr << "Waiting 60 seconds to share caches";
|
||||
int age = time(NULL) - startTS;
|
||||
std::cerr << "Waited " << age << " seconds to share caches";
|
||||
std::cerr << std::endl;
|
||||
sleep(1);
|
||||
}
|
||||
@ -436,7 +438,8 @@ void *do_server_test_thread(void *data)
|
||||
/*** Now Download it! ***/
|
||||
std::list<std::string> srcIds;
|
||||
//srcIds.push_back(sFile.id);
|
||||
srcIds.push_back(oId);
|
||||
// Don't add srcId - to test whether the search works - or not
|
||||
//srcIds.push_back(oId);
|
||||
if (foundFile)
|
||||
{
|
||||
mFt->loadServer->FileRequest(sFile.name, sFile.hash,
|
||||
@ -444,75 +447,14 @@ void *do_server_test_thread(void *data)
|
||||
}
|
||||
|
||||
/* Give it a while to transfer */
|
||||
for(int i = 0; i < 300; i++)
|
||||
for(int i = 0; i < 100; i++)
|
||||
{
|
||||
std::cerr << "Waited " << i * 10 << " seconds for transfer";
|
||||
int age = time(NULL) - startTS;
|
||||
std::cerr << "Waited " << age << " seconds for tranfer";
|
||||
std::cerr << std::endl;
|
||||
sleep(10);
|
||||
}
|
||||
|
||||
#if 0
|
||||
bool
|
||||
while(!mFt->loadServer->ExtraFileStatus(*eit, info))
|
||||
{
|
||||
|
||||
/* max of 30 seconds */
|
||||
now = time(NULL);
|
||||
if (now - start > 30)
|
||||
{
|
||||
/* FAIL */
|
||||
REPORT2( false, "Extra File Hashing");
|
||||
}
|
||||
|
||||
sleep(1);
|
||||
}
|
||||
|
||||
/* Got ExtraFileStatus */
|
||||
REPORT("Successfully Found ExtraFile");
|
||||
|
||||
/* now we can try a search (should succeed) */
|
||||
uint32_t hintflags = 0;
|
||||
if (mFt->loadServer->FileDetails(info.hash, hintflags, info2))
|
||||
{
|
||||
CHECK(info2.hash == info.hash);
|
||||
CHECK(info2.size == info.size);
|
||||
CHECK(info2.fname == info.fname);
|
||||
}
|
||||
else
|
||||
{
|
||||
REPORT2( false, "Search for Extra File (Basic)");
|
||||
}
|
||||
|
||||
/* search with flags (should succeed) */
|
||||
hintflags = RS_FILE_HINTS_EXTRA;
|
||||
if (mFt->loadServer->FileDetails(info.hash, hintflags, info2))
|
||||
{
|
||||
CHECK(info2.hash == info.hash);
|
||||
CHECK(info2.size == info.size);
|
||||
CHECK(info2.fname == info.fname);
|
||||
}
|
||||
else
|
||||
{
|
||||
REPORT2( false, "Search for Extra File (Extra Flag)");
|
||||
}
|
||||
|
||||
/* search with other flags (should fail) */
|
||||
hintflags = RS_FILE_HINTS_REMOTE | RS_FILE_HINTS_SPEC_ONLY;
|
||||
if (mFt->loadServer->FileDetails(info.hash, hintflags, info2))
|
||||
{
|
||||
REPORT2( false, "Search for Extra File (Fail Flags)");
|
||||
}
|
||||
else
|
||||
{
|
||||
REPORT("Search for Extra File (Fail Flags)");
|
||||
}
|
||||
|
||||
/* if we try to download it ... should just find existing one*/
|
||||
|
||||
REPORT("Testing with Extra File");
|
||||
}
|
||||
#endif
|
||||
|
||||
FINALREPORT("Shared Directories, Bool Search, multi-source transfers");
|
||||
exit(1);
|
||||
}
|
||||
|
@ -31,6 +31,24 @@
|
||||
|
||||
#include "fttransfermodule.h"
|
||||
|
||||
/*************************************************************************
|
||||
* Notes on file transfer strategy.
|
||||
* Care must be taken not to overload pipe. best way is to time requests.
|
||||
* and according adjust data rate.
|
||||
*
|
||||
* each peer gets a 'max_rate' which is decided on the type of transfer.
|
||||
* - trickle ...
|
||||
* - stream ...
|
||||
* - max ...
|
||||
*
|
||||
* Each peer is independently managed.
|
||||
*
|
||||
* via the functions:
|
||||
*
|
||||
*/
|
||||
|
||||
const double FT_TM_MAX_PEER_RATE = 1024 * 1024; /* 1MB/s */
|
||||
|
||||
ftTransferModule::ftTransferModule(ftFileCreator *fc, ftDataMultiplex *dm, ftController *c)
|
||||
:mFileCreator(fc), mMultiplexor(dm), mFtController(c), mFlag(0)
|
||||
{
|
||||
@ -43,6 +61,8 @@ ftTransferModule::ftTransferModule(ftFileCreator *fc, ftDataMultiplex *dm, ftCon
|
||||
// Dummy for Testing (should be handled independantly for
|
||||
// each peer.
|
||||
//mChunkSize = 10000;
|
||||
desiredRate = 1000000; /* 1MB/s ??? */
|
||||
actualRate = 0;
|
||||
return;
|
||||
}
|
||||
|
||||
@ -108,10 +128,10 @@ bool ftTransferModule::setPeerState(std::string peerId,uint32_t state,uint32_t m
|
||||
|
||||
(mit->second).state=state;
|
||||
(mit->second).desiredRate=maxRate;
|
||||
(mit->second).actualRate=maxRate; /* should give big kick in right direction */
|
||||
|
||||
std::list<std::string>::iterator it;
|
||||
it=mOnlinePeers.begin();
|
||||
while((it!=mOnlinePeers.end())&&(*it!=peerId)) it++;
|
||||
it = std::find(mOnlinePeers.begin(), mOnlinePeers.end(), peerId);
|
||||
|
||||
if (state!=PQIPEER_NOT_ONLINE)
|
||||
{
|
||||
@ -180,6 +200,8 @@ bool ftTransferModule::recvFileData(std::string peerId, uint64_t offset,
|
||||
std::cerr << std::endl;
|
||||
#endif
|
||||
|
||||
bool ok = false;
|
||||
|
||||
{
|
||||
RsStackMutex stack(tfMtx); /******* STACK LOCKED ******/
|
||||
|
||||
@ -195,33 +217,13 @@ bool ftTransferModule::recvFileData(std::string peerId, uint64_t offset,
|
||||
#endif
|
||||
return false;
|
||||
}
|
||||
ok = locked_recvPeerData(mit->second, offset, chunk_size, data);
|
||||
|
||||
if ((mit->second).state != PQIPEER_DOWNLOADING)
|
||||
{
|
||||
#ifdef FT_DEBUG
|
||||
std::cerr << "ftTransferModule::recvFileData()";
|
||||
std::cerr << " peer not downloading???";
|
||||
std::cerr << std::endl;
|
||||
#endif
|
||||
//return false;
|
||||
}
|
||||
|
||||
if (offset != ((mit->second).offset + (mit->second).receivedSize))
|
||||
{
|
||||
//fix me
|
||||
//received data not expected
|
||||
#ifdef FT_DEBUG
|
||||
std::cerr << "ftTransferModule::recvFileData()";
|
||||
std::cerr << " offset != offset + recvdSize";
|
||||
std::cerr << std::endl;
|
||||
#endif
|
||||
return false;
|
||||
}
|
||||
|
||||
(mit->second).receivedSize += chunk_size;
|
||||
(mit->second).state = PQIPEER_IDLE;
|
||||
} /***** STACK MUTEX END ****/
|
||||
return storeData(offset, chunk_size, data);
|
||||
|
||||
if (ok)
|
||||
storeData(offset, chunk_size, data);
|
||||
return ok;
|
||||
}
|
||||
|
||||
void ftTransferModule::requestData(std::string peerId, uint64_t offset, uint32_t chunk_size)
|
||||
@ -308,85 +310,11 @@ bool ftTransferModule::queryInactive()
|
||||
return false;
|
||||
}
|
||||
|
||||
int ts = time(NULL);
|
||||
uint64_t req_offset;
|
||||
uint32_t req_size;
|
||||
int delta;
|
||||
|
||||
std::map<std::string,peerInfo>::iterator mit;
|
||||
for(mit = mFileSources.begin(); mit != mFileSources.end(); mit++)
|
||||
{
|
||||
std::string peerId = mit->first;
|
||||
peerInfo* pInfo = &mit->second;
|
||||
switch (pInfo->state)
|
||||
{
|
||||
//Peer side has change from online to offline during transfer
|
||||
case PQIPEER_NOT_ONLINE:
|
||||
break;
|
||||
|
||||
//file request has been sent to peer side, but no response received yet
|
||||
case PQIPEER_DOWNLOADING:
|
||||
if (ts - (pInfo->lastTS) < PQIPEER_DOWNLOAD_CHECK)
|
||||
{
|
||||
/* if not timed out yet.... ignore */
|
||||
actualRate += pInfo->actualRate;
|
||||
break;
|
||||
locked_tickPeerTransfer(mit->second);
|
||||
}
|
||||
|
||||
/* otherwise fall through to request it again (with getChunk);
|
||||
*/
|
||||
|
||||
//file response received or peer side is just ready for download
|
||||
case PQIPEER_IDLE:
|
||||
pInfo->actualRate = pInfo->chunkSize/(ts-(pInfo->lastTS));
|
||||
|
||||
if (pInfo->actualRate < pInfo->desiredRate)
|
||||
{
|
||||
if (pInfo->actualRate < pInfo->desiredRate/2)
|
||||
{
|
||||
req_size = pInfo->chunkSize * 2 ;
|
||||
}
|
||||
else
|
||||
{
|
||||
req_size = (uint32_t ) (pInfo->chunkSize * 1.1) ;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
req_size = (uint32_t ) (pInfo->chunkSize * 0.9) ;
|
||||
}
|
||||
|
||||
if (getChunk(req_offset,req_size))
|
||||
{
|
||||
if (req_size > 0)
|
||||
{
|
||||
pInfo->offset = req_offset;
|
||||
pInfo->chunkSize = req_size;
|
||||
pInfo->lastTS = ts;
|
||||
pInfo->state = PQIPEER_DOWNLOADING;
|
||||
pInfo->receivedSize = 0;
|
||||
requestData(peerId,req_offset,req_size);
|
||||
}
|
||||
else
|
||||
{
|
||||
std::cerr << "transfermodule::Waiting for data to be available";
|
||||
std::cerr << std::endl;
|
||||
}
|
||||
}
|
||||
else mFlag = 1;
|
||||
|
||||
actualRate += pInfo->actualRate;
|
||||
break;
|
||||
|
||||
//file transfer has been stopped
|
||||
case PQIPEER_SUSPEND:
|
||||
break;
|
||||
|
||||
default:
|
||||
break;
|
||||
}//switch
|
||||
}//for
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -442,11 +370,28 @@ bool ftTransferModule::completeFileTransfer()
|
||||
int ftTransferModule::tick()
|
||||
{
|
||||
#ifdef FT_DEBUG
|
||||
{
|
||||
RsStackMutex stack(tfMtx); /******* STACK LOCKED ******/
|
||||
|
||||
std::cerr << "ftTransferModule::tick()";
|
||||
std::cerr << " mFlag: " << mFlag;
|
||||
std::cerr << " mHash: " << mHash;
|
||||
std::cerr << " mSize: " << mSize;
|
||||
std::cerr << std::endl;
|
||||
|
||||
std::cerr << "Peers: ";
|
||||
std::map<std::string,peerInfo>::iterator it;
|
||||
for(it = mFileSources.begin(); it != mFileSources.end(); it++)
|
||||
{
|
||||
std::cerr << " " << it->first;
|
||||
}
|
||||
std::cerr << std::endl;
|
||||
|
||||
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
queryInactive();
|
||||
|
||||
uint32_t flags = 0;
|
||||
@ -480,43 +425,119 @@ void ftTransferModule::adjustSpeed()
|
||||
|
||||
std::map<std::string,peerInfo>::iterator mit;
|
||||
|
||||
#ifdef FT_DEBUG
|
||||
std::cerr << "ftTransferModule::adjustSpeed()";
|
||||
std::cerr << " Initial Desired Rate: " << desiredRate << " Actual Rate: " << actualRate;
|
||||
std::cerr << std::endl;
|
||||
#endif
|
||||
|
||||
|
||||
|
||||
actualRate = 0;
|
||||
for(mit = mFileSources.begin(); mit != mFileSources.end(); mit++)
|
||||
{
|
||||
if (((mit->second).state == PQIPEER_DOWNLOADING)
|
||||
|| ((mit->second).state == PQIPEER_IDLE))
|
||||
{
|
||||
|
||||
#ifdef FT_DEBUG
|
||||
std::cerr << "ftTransferModule::adjustSpeed()";
|
||||
std::cerr << "\t" << mit->first << " Desired Rate: " << desiredRate << " Actual Rate: " << actualRate;
|
||||
std::cerr << "Peer: " << mit->first;
|
||||
std::cerr << " Desired Rate: " << (mit->second).desiredRate;
|
||||
std::cerr << " Actual Rate: " << (mit->second).actualRate;
|
||||
std::cerr << std::endl;
|
||||
#endif
|
||||
actualRate += mit->second.actualRate;
|
||||
}
|
||||
|
||||
#ifdef FT_DEBUG
|
||||
std::cerr << "ftTransferModule::adjustSpeed() Totals:";
|
||||
std::cerr << "Desired Rate: " << desiredRate << " Actual Rate: " << actualRate;
|
||||
std::cerr << std::endl;
|
||||
#endif
|
||||
|
||||
if ((actualRate < desiredRate) && ((mit->second).actualRate >= (mit->second).desiredRate))
|
||||
{
|
||||
(mit->second).desiredRate *= 1.1;
|
||||
}
|
||||
|
||||
if ((actualRate > desiredRate) && ((mit->second).actualRate < (mit->second).desiredRate))
|
||||
{
|
||||
(mit->second).desiredRate *= 0.9;
|
||||
}
|
||||
}
|
||||
}
|
||||
#ifdef FT_DEBUG
|
||||
std::cerr << "ftTransferModule::adjustSpeed()";
|
||||
std::cerr << " Initial Desired Rate: " << desiredRate << " Actual Rate: " << actualRate;
|
||||
std::cerr << std::endl;
|
||||
#endif
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
* Actual Peer Transfer Management Code.
|
||||
*
|
||||
* request very tick, at rate
|
||||
*
|
||||
*
|
||||
**/
|
||||
|
||||
const uint32_t FT_TM_MINIMUM_CHUNK = 1024; /* ie 1Kb / sec */
|
||||
const uint32_t FT_TM_RESTART_DOWNLOAD = 60; /* 60 seconds */
|
||||
const uint32_t FT_TM_DOWNLOAD_TIMEOUT = 5; /* 5 seconds */
|
||||
|
||||
bool ftTransferModule::locked_tickPeerTransfer(peerInfo &info)
|
||||
{
|
||||
/* how long has it been? */
|
||||
time_t ts = time(NULL);
|
||||
|
||||
int ageRecv = ts - info.recvTS;
|
||||
int ageReq = ts - info.lastTS;
|
||||
|
||||
if (ageReq > FT_TM_RESTART_DOWNLOAD)
|
||||
{
|
||||
info.state = PQIPEER_DOWNLOADING;
|
||||
info.recvTS = ts; /* reset to activate */
|
||||
ageRecv = 0;
|
||||
}
|
||||
|
||||
if (ageRecv > FT_TM_DOWNLOAD_TIMEOUT)
|
||||
{
|
||||
info.state = PQIPEER_IDLE;
|
||||
return false;
|
||||
}
|
||||
|
||||
/* update rate */
|
||||
info.actualRate = info.actualRate * 0.75 + 0.25 * info.lastTransfers;
|
||||
info.lastTransfers = 0;
|
||||
|
||||
/* request at 10% more than actual rate */
|
||||
uint32_t next_req = info.actualRate * 1.1;
|
||||
|
||||
if (next_req > info.desiredRate * 1.1)
|
||||
next_req = info.desiredRate * 1.1;
|
||||
|
||||
if (next_req > FT_TM_MAX_PEER_RATE)
|
||||
next_req = FT_TM_MAX_PEER_RATE;
|
||||
|
||||
if (next_req < FT_TM_MINIMUM_CHUNK)
|
||||
next_req = FT_TM_MINIMUM_CHUNK;
|
||||
|
||||
info.lastTS = ts;
|
||||
|
||||
/* do request */
|
||||
uint64_t req_offset = 0;
|
||||
if (getChunk(req_offset,next_req))
|
||||
{
|
||||
if (next_req > 0)
|
||||
{
|
||||
info.state = PQIPEER_DOWNLOADING;
|
||||
requestData(info.peerId,req_offset,next_req);
|
||||
}
|
||||
else
|
||||
{
|
||||
std::cerr << "transfermodule::Waiting for available data";
|
||||
std::cerr << std::endl;
|
||||
}
|
||||
}
|
||||
else mFlag = 1;
|
||||
}
|
||||
|
||||
|
||||
|
||||
//interface to client module
|
||||
bool ftTransferModule::locked_recvPeerData(peerInfo &info, uint64_t offset,
|
||||
uint32_t chunk_size, void *data)
|
||||
{
|
||||
#ifdef FT_DEBUG
|
||||
std::cerr << "ftTransferModule::locked_recvPeerData()";
|
||||
std::cerr << " peerId: " << info.peerId;
|
||||
std::cerr << " offset: " << offset;
|
||||
std::cerr << " chunksize: " << chunk_size;
|
||||
std::cerr << " data: " << data;
|
||||
std::cerr << std::endl;
|
||||
#endif
|
||||
|
||||
time_t ts = time(NULL);
|
||||
info.recvTS = ts;
|
||||
info.state = PQIPEER_DOWNLOADING;
|
||||
info.lastTransfers += chunk_size;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -72,13 +72,15 @@ class peerInfo
|
||||
{
|
||||
public:
|
||||
peerInfo(std::string peerId_in):peerId(peerId_in),state(PQIPEER_NOT_ONLINE),desiredRate(0),actualRate(0),
|
||||
offset(0),chunkSize(TRANSFER_START_MIN),receivedSize(0),lastTS(0)
|
||||
offset(0),chunkSize(TRANSFER_START_MIN),receivedSize(0),lastTS(0),
|
||||
recvTS(0), lastTransfers(0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
peerInfo(std::string peerId_in,uint32_t state_in,uint32_t maxRate_in):
|
||||
peerId(peerId_in),state(state_in),desiredRate(maxRate_in),actualRate(0),
|
||||
offset(0),chunkSize(TRANSFER_START_MIN),receivedSize(0),lastTS(0)
|
||||
offset(0),chunkSize(TRANSFER_START_MIN),receivedSize(0),lastTS(0),
|
||||
recvTS(0), lastTransfers(0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
@ -94,7 +96,9 @@ public:
|
||||
//already received data size for current request
|
||||
uint32_t receivedSize;
|
||||
|
||||
time_t lastTS;
|
||||
time_t lastTS; /* last Request */
|
||||
time_t recvTS; /* last Recv */
|
||||
uint32_t lastTransfers; /* data recvd in last second */
|
||||
};
|
||||
|
||||
class ftFileStatus
|
||||
@ -160,6 +164,11 @@ public:
|
||||
|
||||
private:
|
||||
|
||||
bool locked_tickPeerTransfer(peerInfo &info);
|
||||
bool locked_recvPeerData(peerInfo &info, uint64_t offset,
|
||||
uint32_t chunk_size, void *data);
|
||||
|
||||
|
||||
/* These have independent Mutexes / are const locally (no Mutex protection)*/
|
||||
ftFileCreator *mFileCreator;
|
||||
ftDataMultiplex *mMultiplexor;
|
||||
|
@ -30,8 +30,6 @@
|
||||
*#define HUB_DEBUG 1
|
||||
*****/
|
||||
|
||||
#define HUB_DEBUG 1
|
||||
|
||||
P3Hub::P3Hub(uint32_t flags, RsSerialiser *rss)
|
||||
:mSerialiser(rss), mUseSerialiser(false)
|
||||
{
|
||||
|
@ -4,8 +4,8 @@
|
||||
###########################################################################
|
||||
#Define OS.
|
||||
#
|
||||
#OS = Linux
|
||||
OS = MacOSX
|
||||
OS = Linux
|
||||
#OS = MacOSX
|
||||
#OS = Cygwin
|
||||
#OS = Win # MinGw.
|
||||
###########################################################################
|
||||
|
@ -34,6 +34,12 @@
|
||||
* #define TLV_FI_DEBUG 1
|
||||
**/
|
||||
|
||||
|
||||
RsTlvFileItem::RsTlvFileItem()
|
||||
{
|
||||
TlvClear();
|
||||
}
|
||||
|
||||
void RsTlvFileItem::TlvClear()
|
||||
{
|
||||
filesize = 0;
|
||||
|
@ -87,7 +87,7 @@ bool setBinData(void *data, uint16_t size);
|
||||
class RsTlvFileItem: public RsTlvItem
|
||||
{
|
||||
public:
|
||||
RsTlvFileItem() { return; }
|
||||
RsTlvFileItem();
|
||||
virtual ~RsTlvFileItem() { return; }
|
||||
virtual uint16_t TlvSize();
|
||||
virtual void TlvClear();
|
||||
|
Loading…
Reference in New Issue
Block a user