More bugfixes ... got the basic channels file transfer working.

* Create channels directory correctly.
 * added File Transfers to Config List.
 * connected statusChange() monitor callback.
 * fixed file sources in transfermodule.
 * fixed up transfer restarts / sleeps.
 * enabled opening files read only.
 * disabled some of the debug.



git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@799 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
drbob 2008-11-13 23:03:46 +00:00
parent b0d462c93e
commit fce83cb232
16 changed files with 322 additions and 229 deletions

View file

@ -352,6 +352,7 @@ bool ftController::FileRequest(std::string fname, std::string hash,
std::cerr << std::endl;
#endif
std::string ownId = mConnMgr->getOwnId();
uint32_t rate = 0;
if (flags & RS_FILE_HINTS_BACKGROUND)
{
@ -398,12 +399,20 @@ bool ftController::FileRequest(std::string fname, std::string hash,
std::cerr << "ftController::FileRequest() Adding Peer: " << *it;
std::cerr << std::endl;
#endif
/* add peer */
(dit->second).mTransfer->setPeerState(*it,
PQIPEER_IDLE, rate);
return true;
(dit->second).mTransfer->addFileSource(*it);
setPeerState(dit->second.mTransfer, *it,
rate, mConnMgr->isOnline(*it));
}
if (srcIds.size() == 0)
{
#ifdef CONTROL_DEBUG
std::cerr << "ftController::FileRequest() WARNING: No Src Peers";
std::cerr << std::endl;
#endif
}
return true;
}
bool doCallback = false;
@ -504,6 +513,13 @@ bool ftController::FileRequest(std::string fname, std::string hash,
ftFileControl ftfc(fname, savepath, destination,
size, hash, flags, fc, tm, callbackCode);
#ifdef CONTROL_DEBUG
std::cerr << "ftController::FileRequest() Created ftFileCreator @: " << fc;
std::cerr << std::endl;
std::cerr << "ftController::FileRequest() Created ftTransModule @: " << tm;
std::cerr << std::endl;
#endif
/* add to ClientModule */
mDataplex->addTransferModule(tm, fc);
@ -511,43 +527,13 @@ bool ftController::FileRequest(std::string fname, std::string hash,
tm->setFileSources(srcIds);
/* get current state for transfer module */
std::string ownId = mConnMgr->getOwnId();
for(it = srcIds.begin(); it != srcIds.end(); it++)
{
if (*it == ownId)
{
#ifdef CONTROL_DEBUG
std::cerr << "ftController::FileRequest()";
std::cerr << *it << " is Self - set high rate";
std::cerr << std::endl;
std::cerr << "ftController::FileRequest() adding peer: " << *it;
std::cerr << std::endl;
#endif
//tm->setPeerState(*it, RS_FILE_RATE_FAST |
// RS_FILE_PEER_ONLINE, 100000);
//tm->setPeerState(*it, PQIPEER_IDLE, 10000);
tm->setPeerState(*it, PQIPEER_IDLE, rate);
}
else if (mConnMgr->isOnline(*it))
{
#ifdef CONTROL_DEBUG
std::cerr << "ftController::FileRequest()";
std::cerr << *it << " is Online";
std::cerr << std::endl;
#endif
//tm->setPeerState(*it, RS_FILE_RATE_TRICKLE |
// RS_FILE_PEER_ONLINE, 10000);
//tm->setPeerState(*it, PQIPEER_IDLE, 10000);
tm->setPeerState(*it, PQIPEER_IDLE, rate);
}
else
{
#ifdef CONTROL_DEBUG
std::cerr << "ftController::FileRequest()";
std::cerr << *it << " is Offline";
std::cerr << std::endl;
#endif
//tm->setPeerState(*it, RS_FILE_PEER_OFFLINE, 10000);
tm->setPeerState(*it, PQIPEER_IDLE, rate);
}
setPeerState(tm, *it, rate, mConnMgr->isOnline(*it));
}
/* only need to lock before to fiddle with own variables */
@ -559,6 +545,39 @@ bool ftController::FileRequest(std::string fname, std::string hash,
}
bool ftController::setPeerState(ftTransferModule *tm, std::string id,
uint32_t maxrate, bool online)
{
if (id == mConnMgr->getOwnId())
{
#ifdef CONTROL_DEBUG
std::cerr << "ftController::setPeerState() is Self";
std::cerr << std::endl;
#endif
tm->setPeerState(id, PQIPEER_IDLE, maxrate);
}
else if (online)
{
#ifdef CONTROL_DEBUG
std::cerr << "ftController::setPeerState()";
std::cerr << " Peer is Online";
std::cerr << std::endl;
#endif
tm->setPeerState(id, PQIPEER_IDLE, maxrate);
}
else
{
#ifdef CONTROL_DEBUG
std::cerr << "ftController::setPeerState()";
std::cerr << " Peer is Offline";
std::cerr << std::endl;
#endif
tm->setPeerState(id, PQIPEER_NOT_ONLINE, maxrate);
}
return true;
}
bool ftController::FileCancel(std::string hash)
{
@ -825,47 +844,57 @@ bool ftController::FileDetails(std::string hash, FileInfo &info)
*/
void ftController::statusChange(const std::list<pqipeer> &plist)
{
#if 0 /*** FIX ME !!!**************/
RsStackMutex stack(ctrlMutex); /******* LOCKED ********/
uint32_t rate = FT_CNTRL_STANDARD_RATE;
/* add online to all downloads */
std::map<std::string, ftFileControl>::iterator it;
std::list<pqipeer>::const_iterator pit;
for(it = mDownloads.begin(); it != mDownloads.end(); it++)
{
for(pit = plist.begin(); pit != plist.end(); pit++)
{
if (pit->actions | RS_PEER_CONNECTED)
{
((it->second).mTransfer)->setPeer(RS_FILE_PEER_ONLINE | RS_FILE_RATE_TRICKLE);
}
else if (pit->actions | RS_PEER_DISCONNECTED)
{
((it->second).mTransfer)->setPeer(RS_FILE_PEER_OFFLINE);
}
}
}
/* modify my list of peers */
for(pit = plist.begin(); pit != plist.end(); pit++)
{
if (pit->actions | RS_PEER_CONNECTED)
{
/* add in */
((it->second).mTransfer)->setPeer(RS_FILE_PEER_ONLINE | RS_FILE_RATE_TRICKLE);
}
else if (pit->actions | RS_PEER_DISCONNECTED)
{
((it->second).mTransfer)->setPeer(RS_FILE_PEER_OFFLINE);
}
}
#ifdef CONTROL_DEBUG
std::cerr << "ftController::statusChange()";
std::cerr << std::endl;
#endif
for(it = mDownloads.begin(); it != mDownloads.end(); it++)
{
#ifdef CONTROL_DEBUG
std::cerr << "ftController::statusChange() Updating Hash:";
std::cerr << it->first;
std::cerr << std::endl;
#endif
for(pit = plist.begin(); pit != plist.end(); pit++)
{
#ifdef CONTROL_DEBUG
std::cerr << "Peer: " << pit->id;
#endif
if (pit->actions & RS_PEER_CONNECTED)
{
#ifdef CONTROL_DEBUG
std::cerr << " is Newly Connected!";
std::cerr << std::endl;
#endif
setPeerState(it->second.mTransfer, pit->id, rate, true);
}
else if (pit->actions & RS_PEER_DISCONNECTED)
{
#ifdef CONTROL_DEBUG
std::cerr << " is Just disconnected!";
std::cerr << std::endl;
#endif
setPeerState(it->second.mTransfer, pit->id, rate, false);
}
else
{
#ifdef CONTROL_DEBUG
std::cerr << " had something happen to it: ";
std::cerr << pit-> actions;
std::cerr << std::endl;
#endif
setPeerState(it->second.mTransfer, pit->id, rate, false);
}
}
}
}
/* p3Config Interface */
RsSerialiser *ftController::setupSerialiser()

View file

@ -150,6 +150,9 @@ virtual bool loadList(std::list<RsItem *> load);
void checkDownloadQueue();
bool completeFile(std::string hash);
bool setPeerState(ftTransferModule *tm, std::string id,
uint32_t maxrate, bool online);
/* pointers to other components */
ftSearch *mSearch;

View file

@ -7,7 +7,7 @@
#define FILE_DEBUG 1
#define CHUNK_MAX_AGE 30
#define CHUNK_MAX_AGE 20
/***********************************************************
@ -147,20 +147,19 @@ int ftFileCreator::initializeFileAttrs()
/*
* check if the file exists
* cant use FileProviders verion because that opens readonly.
*/
if (ftFileProvider::initializeFileAttrs())
{
return 1;
}
RsStackMutex stack(ftcMutex); /********** STACK LOCKED MTX ******/
if (fd)
return 1;
/*
* check if the file exists
*/
{
std::cerr << "ftFileCreator::initializeFileAttrs() opening w+b";
std::cerr << "ftFileCreator::initializeFileAttrs() trying (r+b) ";
std::cerr << std::endl;
}
@ -168,15 +167,28 @@ int ftFileCreator::initializeFileAttrs()
* attempt to open file
*/
fd = fopen(file_name.c_str(), "w+b");
fd = fopen(file_name.c_str(), "r+b");
if (!fd)
{
std::cerr << "ftFileCreator::initializeFileAttrs()";
std::cerr << " Failed to open (w+b): "<< file_name << std::endl;
return 0;
std::cerr << "ftFileCreator::initializeFileAttrs() Failed to open (r+b): ";
std::cerr << file_name << std::endl;
std::cerr << "ftFileCreator::initializeFileAttrs() opening w+b";
std::cerr << std::endl;
/* try opening for write */
fd = fopen(file_name.c_str(), "w+b");
if (!fd)
{
std::cerr << "ftFileCreator::initializeFileAttrs()";
std::cerr << " Failed to open (w+b): "<< file_name << std::endl;
return 0;
}
}
/*
* if it opened, find it's length
* move to the end

View file

@ -64,8 +64,9 @@ bool ftFileProvider::getFileData(uint64_t offset, uint32_t chunk_size, void *dat
/*
* FIXME: Warning of comparison between unsigned and signed int?
*/
int data_size = chunk_size;
long base_loc = offset;
uint32_t data_size = chunk_size;
uint64_t base_loc = offset;
if (base_loc + data_size > mSize)
{
@ -138,13 +139,22 @@ int ftFileProvider::initializeFileAttrs()
* attempt to open file
*/
fd = fopen(file_name.c_str(), "r+b");
fd = fopen(file_name.c_str(), "rb");
if (!fd)
{
std::cerr << "ftFileProvider::initializeFileAttrs() Failed to open (r+b): ";
std::cerr << file_name << std::endl;
return 0;
/* try opening read only */
fd = fopen(file_name.c_str(), "rb");
if (!fd)
{
std::cerr << "ftFileProvider::initializeFileAttrs() Failed to open (rb): ";
std::cerr << file_name << std::endl;
/* try opening read only */
return 0;
}
}
/*

View file

@ -48,6 +48,7 @@
*/
const double FT_TM_MAX_PEER_RATE = 1024 * 1024; /* 1MB/s */
const uint32_t FT_TM_MAX_RESETS = 5;
ftTransferModule::ftTransferModule(ftFileCreator *fc, ftDataMultiplex *dm, ftController *c)
:mFileCreator(fc), mMultiplexor(dm), mFtController(c), mFlag(0)
@ -111,6 +112,38 @@ bool ftTransferModule::getFileSources(std::list<std::string> &peerIds)
return true;
}
bool ftTransferModule::addFileSource(std::string peerId)
{
RsStackMutex stack(tfMtx); /******* STACK LOCKED ******/
std::map<std::string,peerInfo>::iterator mit;
mit = mFileSources.find(peerId);
if (mit == mFileSources.end())
{
/* add in new source */
peerInfo pInfo(peerId);
mFileSources.insert(std::pair<std::string,peerInfo>(peerId,pInfo));
mit = mFileSources.find(peerId);
#ifdef FT_DEBUG
std::cerr << "ftTransferModule::addFileSource()";
std::cerr << " adding peer: " << peerId << " to sourceList";
std::cerr << std::endl;
#endif
}
else
{
#ifdef FT_DEBUG
std::cerr << "ftTransferModule::addFileSource()";
std::cerr << " peer: " << peerId << " already there";
std::cerr << std::endl;
#endif
}
}
bool ftTransferModule::setPeerState(std::string peerId,uint32_t state,uint32_t maxRate)
{
RsStackMutex stack(tfMtx); /******* STACK LOCKED ******/
@ -124,7 +157,17 @@ bool ftTransferModule::setPeerState(std::string peerId,uint32_t state,uint32_t m
std::map<std::string,peerInfo>::iterator mit;
mit = mFileSources.find(peerId);
if (mit == mFileSources.end()) return false;
if (mit == mFileSources.end())
{
/* add in new source */
#ifdef FT_DEBUG
std::cerr << "ftTransferModule::setPeerState()";
std::cerr << " adding new peer to sourceList";
std::cerr << std::endl;
#endif
return false;
}
(mit->second).state=state;
(mit->second).desiredRate=maxRate;
@ -458,9 +501,25 @@ void ftTransferModule::adjustSpeed()
**/
const uint32_t FT_TM_MINIMUM_CHUNK = 1024; /* ie 1Kb / sec */
const uint32_t FT_TM_RESTART_DOWNLOAD = 60; /* 60 seconds */
const uint32_t FT_TM_RESTART_DOWNLOAD = 10; /* 10 seconds */
const uint32_t FT_TM_DOWNLOAD_TIMEOUT = 5; /* 5 seconds */
/* NOTEs on this function...
* 1) This is the critical function for deciding the rate at which ft takes place.
* 2) Some of the peers might not have the file... care must be taken avoid deadlock.
*
* Eg. A edge case which fails badly.
* Small 1K file (one chunk), with 3 sources (A,B,C). A doesn't have file.
* (a) request data from A. B & C pause cos no more data needed.
* (b) all timeout, chunk reset... then back to request again (a) and repeat.
* (c) all timeout x 5 and are disabled.... no transfer, while B&C had it all the time.
*
* To solve this we might need random waiting periods, so each peer can
* be tried.
*
*
*/
bool ftTransferModule::locked_tickPeerTransfer(peerInfo &info)
{
/* how long has it been? */
@ -477,11 +536,31 @@ bool ftTransferModule::locked_tickPeerTransfer(peerInfo &info)
return false;
}
if (ageReq > FT_TM_RESTART_DOWNLOAD)
if (ageReq > FT_TM_RESTART_DOWNLOAD * (info.nResets + 1))
{
if (info.nResets > 1) /* 3rd timeout */
{
/* 90% chance of return false...
* will mean variations in which peer
* starts first. hopefully stop deadlocks.
*/
if (rand() % 10 != 0)
{
return false;
}
}
info.state = PQIPEER_DOWNLOADING;
info.recvTS = ts; /* reset to activate */
info.nResets++;
ageRecv = 0;
if (info.nResets >= FT_TM_MAX_RESETS)
{
/* for this file anyway */
info.state = PQIPEER_NOT_ONLINE;
return false;
}
}
if (ageRecv > FT_TM_DOWNLOAD_TIMEOUT)
@ -543,6 +622,7 @@ bool ftTransferModule::locked_recvPeerData(peerInfo &info, uint64_t offset,
time_t ts = time(NULL);
info.recvTS = ts;
info.nResets = 0;
info.state = PQIPEER_DOWNLOADING;
info.lastTransfers += chunk_size;

View file

@ -45,42 +45,25 @@
#include "util/rsthreads.h"
const int PQIPEER_INIT = 0x0000;
const int PQIPEER_NOT_ONLINE = 0x0001;
const int PQIPEER_DOWNLOADING = 0x0002;
const int PQIPEER_IDLE = 0x0004;
const int PQIPEER_SUSPEND = 0x0010;
const uint32_t PQIPEER_INIT = 0x0000;
const uint32_t PQIPEER_NOT_ONLINE = 0x0001;
const uint32_t PQIPEER_DOWNLOADING = 0x0002;
const uint32_t PQIPEER_IDLE = 0x0004;
const uint32_t PQIPEER_SUSPEND = 0x0010;
const uint32_t PQIPEER_OFFLINE_CHECK = 120; /* check every 2 minutes */
const uint32_t PQIPEER_DOWNLOAD_TIMEOUT = 60; /* time it out, -> offline after 60 secs */
const uint32_t PQIPEER_DOWNLOAD_CHECK = 10; /* desired delta = 10 secs */
const uint32_t PQIPEER_DOWNLOAD_TOO_FAST = 8; /* 8 secs */
const uint32_t PQIPEER_DOWNLOAD_TOO_SLOW = 12; /* 12 secs */
const uint32_t PQIPEER_DOWNLOAD_MIN_DELTA = 5; /* 5 secs */
const uint32_t TRANSFER_START_MIN = 10000; /* 10000 byte min limit */
const uint32_t TRANSFER_START_MAX = 10000; /* 10000 byte max limit */
/*
class Request
{
public:
uint64_t offset;
uint32_t chunkSize;
};
*/
class peerInfo
{
public:
peerInfo(std::string peerId_in):peerId(peerId_in),state(PQIPEER_NOT_ONLINE),desiredRate(0),actualRate(0),
offset(0),chunkSize(TRANSFER_START_MIN),receivedSize(0),lastTS(0),
recvTS(0), lastTransfers(0)
offset(0),chunkSize(0),receivedSize(0),lastTS(0),
recvTS(0), lastTransfers(0), nResets(0)
{
return;
}
peerInfo(std::string peerId_in,uint32_t state_in,uint32_t maxRate_in):
peerId(peerId_in),state(state_in),desiredRate(maxRate_in),actualRate(0),
offset(0),chunkSize(TRANSFER_START_MIN),receivedSize(0),lastTS(0),
recvTS(0), lastTransfers(0)
offset(0),chunkSize(0),receivedSize(0),lastTS(0),
recvTS(0), lastTransfers(0), nResets(0)
{
return;
}
@ -99,6 +82,7 @@ public:
time_t lastTS; /* last Request */
time_t recvTS; /* last Recv */
uint32_t lastTransfers; /* data recvd in last second */
uint32_t nResets; /* count to disable non-existant files */
};
class ftFileStatus
@ -135,6 +119,7 @@ public:
//interface to download controller
bool setFileSources(std::list<std::string> peerIds);
bool addFileSource(std::string peerId);
bool setPeerState(std::string peerId,uint32_t state,uint32_t maxRate); //state = ONLINE/OFFLINE
bool getFileSources(std::list<std::string> &peerIds);
bool getPeerState(std::string peerId,uint32_t &state,uint32_t &tfRate);