First commit for the turtle download. It works without perturbating RS traffic, but still needs some (internal) smoothing

git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@1275 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
csoler 2009-05-26 21:42:45 +00:00
parent c298de7f15
commit 1046bdd846
18 changed files with 1603 additions and 806 deletions

View file

@ -43,6 +43,8 @@
#include "ft/ftdatamultiplex.h"
#include "ft/ftextralist.h"
#include "turtle/p3turtle.h"
#include "util/rsdir.h"
#include "pqi/p3connmgr.h"
@ -76,17 +78,48 @@ ftFileControl::ftFileControl(std::string fname,
}
ftController::ftController(CacheStrapper *cs, ftDataMultiplex *dm, std::string configDir)
:CacheTransfer(cs), p3Config(CONFIG_TYPE_FT_CONTROL), mDataplex(dm), mFtActive(false)
:CacheTransfer(cs), p3Config(CONFIG_TYPE_FT_CONTROL), mDataplex(dm), mFtActive(false),mTurtle(NULL)
{
/* TODO */
}
void ftController::setTurtleRouter(p3turtle *pt)
{
mTurtle = pt ;
}
void ftController::setFtSearchNExtra(ftSearch *search, ftExtraList *list)
{
mSearch = search;
mExtraList = list;
}
void ftController::addFileSource(const std::string& hash,const std::string& peer_id)
{
RsStackMutex stack(ctrlMutex); /******* LOCKED ********/
std::map<std::string, ftFileControl>::iterator it;
std::map<std::string, ftFileControl> currentDownloads = *(&mDownloads);
#ifdef CONTROL_DEBUG
std::cerr << "ftController: Adding source " << peer_id << " to current download hash=" << hash ;
#endif
for(it = currentDownloads.begin(); it != currentDownloads.end(); it++)
if(it->first == hash)
{
it->second.mTransfer->addFileSource(peer_id);
// setPeerState(it->second.mTransfer, peer_id, rate, mConnMgr->isOnline(peer_id));
#ifdef CONTROL_DEBUG
std::cerr << "... added." << std::endl ;
#endif
return ;
}
#ifdef CONTROL_DEBUG
std::cerr << "... not added: hash not found." << std::endl ;
#endif
}
void ftController::run()
{
@ -134,26 +167,35 @@ void ftController::run()
std::cerr << std::endl;
#endif
if (it->second.mTransfer) {
(it->second.mTransfer)->tick();
//check if a cache file is downloaded, if the case, timeout the transfer after TIMOUT_CACHE_FILE_TRANSFER
if ((it->second).mFlags & RS_FILE_HINTS_CACHE) {
if (it->second.mTransfer)
{
#ifdef CONTROL_DEBUG
std::cerr << "ftController::run() cache transfer found. age of this tranfer is :" << (int)(time(NULL) - (it->second).mCreateTime);
std::cerr << "\tTicking mTransfer: " << (void*)it->second.mTransfer;
std::cerr << std::endl;
#endif
if ((time(NULL) - (it->second).mCreateTime) > TIMOUT_CACHE_FILE_TRANSFER) {
(it->second.mTransfer)->tick();
//check if a cache file is downloaded, if the case, timeout the transfer after TIMOUT_CACHE_FILE_TRANSFER
if ((it->second).mFlags & RS_FILE_HINTS_CACHE) {
#ifdef CONTROL_DEBUG
std::cerr << "ftController::run() cache transfer to old. Cancelling transfer. Hash :" << (it->second).mHash;
std::cerr << std::endl;
std::cerr << "ftController::run() cache transfer found. age of this tranfer is :" << (int)(time(NULL) - (it->second).mCreateTime);
std::cerr << std::endl;
#endif
this->FileCancel((it->second).mHash);
if ((time(NULL) - (it->second).mCreateTime) > TIMOUT_CACHE_FILE_TRANSFER) {
#ifdef CONTROL_DEBUG
std::cerr << "ftController::run() cache transfer to old. Cancelling transfer. Hash :" << (it->second).mHash << ", time=" << (it->second).mCreateTime << ", now = " << time(NULL) ;
std::cerr << std::endl;
#endif
this->FileCancel((it->second).mHash);
}
}
}
}
#ifdef CONTROL_DEBUG
else
std::cerr << "No mTransfer for this hash." << std::endl ;
#endif
}
}
@ -618,6 +660,7 @@ bool ftController::FileRequest(std::string fname, std::string hash,
/* do a source search - for any extra sources */
if (mSearch->search(hash, size,
RS_FILE_HINTS_REMOTE |
RS_FILE_HINTS_TURTLE |
RS_FILE_HINTS_SPEC_ONLY, info))
{
/* do something with results */
@ -729,7 +772,7 @@ bool ftController::setPeerState(ftTransferModule *tm, std::string id,
#endif
tm->setPeerState(id, PQIPEER_IDLE, maxrate);
}
else if (online)
else if (online || mTurtle->isOnline(id))
{
#ifdef CONTROL_DEBUG
std::cerr << "ftController::setPeerState()";
@ -1087,6 +1130,9 @@ void ftController::statusChange(const std::list<pqipeer> &plist)
std::map<std::string, ftFileControl>::iterator it;
std::list<pqipeer>::const_iterator pit;
std::list<pqipeer> vlist ;
mTurtle->getVirtualPeersList(vlist) ;
#ifdef CONTROL_DEBUG
std::cerr << "ftController::statusChange()";
std::cerr << std::endl;
@ -1126,6 +1172,40 @@ void ftController::statusChange(const std::list<pqipeer> &plist)
std::cerr << " had something happen to it: ";
std::cerr << pit-> actions;
std::cerr << std::endl;
#endif
setPeerState(it->second.mTransfer, pit->id, rate, false);
}
}
// Now also look at turtle virtual peers.
//
for(pit = vlist.begin(); pit != vlist.end(); pit++)
{
#ifdef CONTROL_DEBUG
std::cerr << "Peer: " << pit->id;
#endif
if (pit->actions & RS_PEER_CONNECTED)
{
#ifdef CONTROL_DEBUG
std::cerr << " is Newly Connected!";
std::cerr << std::endl;
#endif
setPeerState(it->second.mTransfer, pit->id, rate, true);
}
else if (pit->actions & RS_PEER_DISCONNECTED)
{
#ifdef CONTROL_DEBUG
std::cerr << " is Just disconnected!";
std::cerr << std::endl;
#endif
setPeerState(it->second.mTransfer, pit->id, rate, false);
}
else
{
#ifdef CONTROL_DEBUG
std::cerr << " had something happen to it: ";
std::cerr << pit-> actions;
std::cerr << std::endl;
#endif
setPeerState(it->second.mTransfer, pit->id, rate, false);
}

View file

@ -44,6 +44,7 @@ class ftFileProvider;
class ftSearch;
class ftExtraList;
class ftDataMultiplex;
class p3turtle ;
#include "dbase/cachestrapper.h"
#include "util/rsthreads.h"
@ -114,6 +115,7 @@ class ftController: public CacheTransfer, public RsThread, public pqiMonitor, pu
ftController(CacheStrapper *cs, ftDataMultiplex *dm, std::string configDir);
void setFtSearchNExtra(ftSearch *, ftExtraList *);
void setTurtleRouter(p3turtle *) ;
bool activate();
virtual void run();
@ -159,6 +161,7 @@ virtual bool CancelCacheFile(RsPeerId id, std::string path, std::string hash, ui
/* pqiMonitor callback (also provided mConnMgr pointer!) */
public:
virtual void statusChange(const std::list<pqipeer> &plist);
void addFileSource(const std::string& hash,const std::string& peer_id) ;
/* p3Config Interface */
@ -184,6 +187,7 @@ bool setPeerState(ftTransferModule *tm, std::string id,
ftSearch *mSearch;
ftDataMultiplex *mDataplex;
ftExtraList *mExtraList;
p3turtle *mTurtle ;
RsMutex ctrlMutex;

View file

@ -413,7 +413,7 @@ bool ftDataMultiplex::locked_handleServerRequest(ftFileProvider *provider,
std::string peerId, std::string hash, uint64_t size,
uint64_t offset, uint32_t chunksize)
{
void *data = malloc(size);
void *data = malloc(chunksize);
#ifdef MPLEX_DEBUG
std::cerr << "ftDataMultiplex::locked_handleServerRequest()";

View file

@ -32,6 +32,7 @@ const int ftserverzone = 29539;
#include "ft/ftcontroller.h"
#include "ft/ftfileprovider.h"
#include "ft/ftdatamultiplex.h"
#include "turtle/p3turtle.h"
// Includes CacheStrapper / FiMonitor / FiStore for us.
@ -114,7 +115,7 @@ void ftServer::SetupFtServer(NotifyBase *cb)
mFtSearch = new ftFileSearch();
/* Transport */
mFtDataplex = new ftDataMultiplex(ownId, this, mFtSearch);
mFtDataplex = new ftDataMultiplex(ownId, this, mFtSearch);
/* make Controller */
mFtController = new ftController(mCacheStrapper, mFtDataplex, mConfigPath);
@ -144,6 +145,13 @@ void ftServer::SetupFtServer(NotifyBase *cb)
return;
}
void ftServer::connectToTurtleRouter(p3turtle *fts)
{
mTurtleRouter = fts ;
mFtSearch->addSearchMode(fts, RS_FILE_HINTS_TURTLE);
mFtController->setTurtleRouter(fts) ;
}
void ftServer::StartupThreads()
{
@ -533,22 +541,27 @@ bool ftServer::loadConfigMap(std::map<std::string, std::string> &configMap)
bool ftServer::sendDataRequest(std::string peerId, std::string hash,
uint64_t size, uint64_t offset, uint32_t chunksize)
{
/* create a packet */
/* push to networking part */
RsFileRequest *rfi = new RsFileRequest();
if(mTurtleRouter->isTurtlePeer(peerId))
mTurtleRouter->sendDataRequest(peerId,hash,size,offset,chunksize) ;
else
{
/* create a packet */
/* push to networking part */
RsFileRequest *rfi = new RsFileRequest();
/* id */
rfi->PeerId(peerId);
/* id */
rfi->PeerId(peerId);
/* file info */
rfi->file.filesize = size;
rfi->file.hash = hash; /* ftr->hash; */
/* file info */
rfi->file.filesize = size;
rfi->file.hash = hash; /* ftr->hash; */
/* offsets */
rfi->fileoffset = offset; /* ftr->offset; */
rfi->chunksize = chunksize; /* ftr->chunk; */
/* offsets */
rfi->fileoffset = offset; /* ftr->offset; */
rfi->chunksize = chunksize; /* ftr->chunk; */
mP3iface->SendFileRequest(rfi);
mP3iface->SendFileRequest(rfi);
}
return true;
}
@ -558,8 +571,7 @@ bool ftServer::sendDataRequest(std::string peerId, std::string hash,
const uint32_t MAX_FT_CHUNK = 8 * 1024; /* 16K */
/* Server Send */
bool ftServer::sendData(std::string peerId, std::string hash, uint64_t size,
uint64_t baseoffset, uint32_t chunksize, void *data)
bool ftServer::sendData(std::string peerId, std::string hash, uint64_t size, uint64_t baseoffset, uint32_t chunksize, void *data)
{
/* create a packet */
/* push to networking part */
@ -587,37 +599,40 @@ bool ftServer::sendData(std::string peerId, std::string hash, uint64_t size,
/******** New Serialiser Type *******/
RsFileData *rfd = new RsFileData();
if(mTurtleRouter->isTurtlePeer(peerId))
mTurtleRouter->sendFileData(peerId,hash,size,baseoffset+offset,chunk,&(((uint8_t *) data)[offset])) ;
else
{
RsFileData *rfd = new RsFileData();
/* set id */
rfd->PeerId(peerId);
/* set id */
rfd->PeerId(peerId);
/* file info */
rfd->fd.file.filesize = size;
rfd->fd.file.hash = hash;
rfd->fd.file.name = ""; /* blank other data */
rfd->fd.file.path = "";
rfd->fd.file.pop = 0;
rfd->fd.file.age = 0;
/* file info */
rfd->fd.file.filesize = size;
rfd->fd.file.hash = hash;
rfd->fd.file.name = ""; /* blank other data */
rfd->fd.file.path = "";
rfd->fd.file.pop = 0;
rfd->fd.file.age = 0;
rfd->fd.file_offset = baseoffset + offset;
rfd->fd.file_offset = baseoffset + offset;
/* file data */
rfd->fd.binData.setBinData(
&(((uint8_t *) data)[offset]), chunk);
/* file data */
rfd->fd.binData.setBinData( &(((uint8_t *) data)[offset]), chunk);
/* print the data pointer */
mP3iface->SendFileData(rfd);
/* print the data pointer */
#ifdef SERVER_DEBUG
std::cerr << "ftServer::sendData() Packet: " << std::endl;
std::cerr << " offset: " << rfd->fd.file_offset;
std::cerr << " chunk: " << chunk;
std::cerr << " len: " << rfd->fd.binData.bin_len;
std::cerr << " data: " << rfd->fd.binData.bin_data;
std::cerr << std::endl;
std::cerr << "ftServer::sendData() Packet: " << std::endl;
std::cerr << " offset: " << rfd->fd.file_offset;
std::cerr << " chunk: " << chunk;
std::cerr << " len: " << rfd->fd.binData.bin_len;
std::cerr << " data: " << rfd->fd.binData.bin_data;
std::cerr << std::endl;
#endif
mP3iface->SendFileData(rfd);
}
offset += chunk;
tosend -= chunk;

View file

@ -50,7 +50,6 @@
#include "pqi/pqi.h"
#include "pqi/p3cfgmgr.h"
class p3ConnectMgr;
class p3AuthMgr;
@ -67,6 +66,7 @@ class ftExtraList;
class ftFileSearch;
class ftDataMultiplex;
class p3turtle;
class ftServer: public RsFiles, public ftDataSend, public RsThread
{
@ -94,6 +94,7 @@ std::string OwnId();
/* Final Setup (once everything is assigned) */
//void SetupFtServer();
void SetupFtServer(NotifyBase *cb);
void connectToTurtleRouter(p3turtle *p) ;
void StartupThreads();
@ -105,6 +106,11 @@ virtual void run();
/************** (Implements RsFiles) ***************************/
/***************************************************************/
// member access
ftDataMultiplex *getMultiplexer() const { return mFtDataplex ; }
ftController *getController() const { return mFtController ; }
/***
* Control of Downloads
***/
@ -227,6 +233,7 @@ bool loadConfigMap(std::map<std::string, std::string> &configMap);
ftExtraList *mFtExtra;
ftDataMultiplex *mFtDataplex;
p3turtle *mTurtleRouter ;
ftFileSearch *mFtSearch;

View file

@ -422,6 +422,7 @@ bool ftTransferModule::completeFileTransfer()
int ftTransferModule::tick()
{
queryInactive();
#ifdef FT_DEBUG
{
RsStackMutex stack(tfMtx); /******* STACK LOCKED ******/
@ -444,9 +445,6 @@ int ftTransferModule::tick()
}
#endif
queryInactive();
uint32_t flags = 0;
{
RsStackMutex stack(tfMtx); /******* STACK LOCKED ******/