Integration of ft into a working system.

* Made ftTransferModule compile.
 * bugfixes to make ftserver1test work.
 * New P3Pipe / P3Hub ...
 * Added Test Notes.
 * First functions added to ftcontroller
 * added isOnline to p3ConnectMgr.



git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@698 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
drbob 2008-08-17 15:23:11 +00:00
parent c4a3792500
commit eb8dacc798
15 changed files with 647 additions and 143 deletions

View File

@ -34,6 +34,8 @@
* #define CS_DEBUG 1
***/
#define CS_DEBUG 1
bool operator<(const CacheId &a, const CacheId &b)
{
if (a.type == b.type)

View File

@ -0,0 +1,38 @@
Features that need to be tested, and which test checks it.
-----------------------------------------------------------
(*) Transfer Related
(*) Search Related
(*) ExtraList Related
(*) Cache Related
(*) Config / Storage.
Change Config Directory works dynamically. (no restart required).
(*) Miscelleous
-----------------------------------------------------------
-----------------------------------------------------------
Actual Tests, and what they check.
-----------------------------------------------------------
ftserver1test.cc
==================
Used to develop test framework.
Tests.
1) Test Framework. OK
2) File Indexing. OK
3) Cache Packet Exchange. OK
4) Cache downloads (including loopback). FAILS
5) FileIndex Store loading.
6) Basic Search.

View File

@ -47,12 +47,30 @@
#include "pqi/p3connmgr.h"
#define CONTROL_DEBUG 1
#warning CONFIG_FT_CONTROL Not defined in p3cfgmgr.h
const uint32_t CONFIG_FT_CONTROL = 1;
ftFileControl::ftFileControl()
:mTransfer(NULL), mCreator(NULL),
mState(0), mSize(0), mFlags(0)
{
return;
}
ftFileControl::ftFileControl(std::string fname, uint64_t size,
std::string hash, uint32_t flags,
ftFileCreator *fc, ftTransferModule *tm)
:mTransfer(tm), mCreator(fc), mState(0), mHash(hash),
mName(fname), mSize(size), mFlags(0)
{
return;
}
ftController::ftController(CacheStrapper *cs, ftDataMultiplex *dm, std::string configDir)
:CacheTransfer(cs), p3Config(CONFIG_FT_CONTROL)
:CacheTransfer(cs), p3Config(CONFIG_FT_CONTROL), mDataplex(dm)
{
/* TODO */
}
@ -65,6 +83,20 @@ void ftController::setFtSearch(ftSearch *search)
void ftController::run()
{
/* check the queues */
std::cerr << "ftController::run()";
std::cerr << std::endl;
/* tick the transferModules */
RsStackMutex stack(ctrlMutex); /******* LOCKED ********/
std::map<std::string, ftFileControl>::iterator it;
for(it = mDownloads.begin(); it != mDownloads.end(); it++)
{
std::cerr << "\tTicking: " << it->first;
std::cerr << std::endl;
(it->second.mTransfer)->tick();
}
}
@ -129,28 +161,47 @@ bool ftController::FileRequest(std::string fname, std::string hash,
uint64_t size, std::string dest, uint32_t flags,
std::list<std::string> &srcIds)
{
#if 0 /*** FIX ME !!!**************/
/* check if we have the file */
FileInfo info;
std::list<std::string>::iterator it;
if (mSearch->search(hash, size,
RS_FILE_HINTS_LOCAL |
RS_FILE_HINTS_EXTRA |
RS_FILE_HINTS_SPEC_ONLY, info))
#ifdef CONTROL_DEBUG
std::cerr << "ftController::FileRequest(" << fname << ",";
std::cerr << hash << "," << size << "," << dest << ",";
std::cerr << flags << ",<";
for(it = srcIds.begin(); it != srcIds.end(); it++)
{
/* have it already */
/* add in as completed transfer */
return true;
std::cerr << *it << ",";
}
std::cerr << ">)";
std::cerr << std::endl;
#endif
/* do a source search - for any extra sources */
if (mSearch->search(hash, size,
RS_FILE_HINTS_REMOTE |
RS_FILE_HINTS_SPEC_ONLY, info))
if (flags | RS_FILE_HINTS_NO_SEARCH)
{
/* do something with results */
/* no search */
}
else
{
if (mSearch->search(hash, size,
RS_FILE_HINTS_LOCAL |
RS_FILE_HINTS_EXTRA |
RS_FILE_HINTS_SPEC_ONLY, info))
{
/* have it already */
/* add in as completed transfer */
return true;
}
/* do a source search - for any extra sources */
if (mSearch->search(hash, size,
RS_FILE_HINTS_REMOTE |
RS_FILE_HINTS_SPEC_ONLY, info))
{
/* do something with results */
}
}
std::map<std::string, ftTransferModule *> mTransfers;
@ -172,16 +223,26 @@ bool ftController::FileRequest(std::string fname, std::string hash,
tm->setFileSources(srcIds);
/* get current state for transfer module */
std::list<std::string>::iterator it;
for(it = srcIds.begin(); it != srcIds.end(); it++)
{
if (mConnMgr->isOnline(*it))
{
tm->setPeer(*it, RS_FILE_RATE_TRICKLE | RS_FILE_PEER_ONLINE);
#ifdef CONTROL_DEBUG
std::cerr << "ftController::FileRequest()";
std::cerr << *it << " is Online";
std::cerr << std::endl;
#endif
tm->setPeerState(*it, RS_FILE_RATE_TRICKLE |
RS_FILE_PEER_ONLINE, 10000);
}
else
{
tm->setPeer(*it, RS_FILE_PEER_OFFLINE);
#ifdef CONTROL_DEBUG
std::cerr << "ftController::FileRequest()";
std::cerr << *it << " is Offline";
std::cerr << std::endl;
#endif
tm->setPeerState(*it, RS_FILE_PEER_OFFLINE, 10000);
}
}
@ -190,8 +251,6 @@ bool ftController::FileRequest(std::string fname, std::string hash,
mDownloads[hash] = ftfc;
mSlowQueue.push_back(hash);
#endif
}
@ -370,3 +429,33 @@ bool ftController::loadList(std::list<RsItem *> load)
}
/* Cache Interface */
bool ftController::RequestCacheFile(RsPeerId id, std::string path, std::string hash, uint64_t size)
{
#ifdef CONTROL_DEBUG
std::cerr << "ftController::RequestCacheFile(" << id << ",";
std::cerr << path << "," << hash << "," << size << ")";
std::cerr << std::endl;
#endif
/* Request File */
std::list<std::string> ids;
ids.push_back(id);
FileRequest(hash, hash, size, path,
RS_FILE_HINTS_CACHE | RS_FILE_HINTS_NO_SEARCH, ids);
}
bool ftController::CancelCacheFile(RsPeerId id, std::string path, std::string hash, uint64_t size)
{
#ifdef CONTROL_DEBUG
std::cerr << "ftController::CancelCacheFile(" << id << ",";
std::cerr << path << "," << hash << "," << size << ")";
std::cerr << std::endl;
#endif
}

View File

@ -67,6 +67,7 @@ class ftFileControl
std::string mHash;
std::string mName;
uint64_t mSize;
uint32_t mFlags;
};
@ -103,6 +104,15 @@ std::string getDownloadDirectory();
std::string getPartialsDirectory();
bool FileDetails(std::string hash, FileInfo &info);
/***************************************************************/
/********************** Cache Transfer *************************/
/***************************************************************/
protected:
virtual bool RequestCacheFile(RsPeerId id, std::string path, std::string hash, uint64_t size);
virtual bool CancelCacheFile(RsPeerId id, std::string path, std::string hash, uint64_t size);
/***************************************************************/
/********************** Controller Access **********************/
/***************************************************************/
@ -143,6 +153,46 @@ bool completeFile(std::string hash);
std::string mConfigPath;
std::string mDownloadPath;
std::string mPartialPath;
/**** SPEED QUEUES ****/
std::list<std::string> mSlowQueue;
std::list<std::string> mStreamQueue;
std::list<std::string> mFastQueue;
};
#endif
#if 0
class CacheTransfer
{
public:
CacheTransfer(CacheStrapper *cs) :strapper(cs) { return; }
virtual ~CacheTransfer() {}
/* upload side of things .... searches through CacheStrapper. */
bool FindCacheFile(std::string hash, std::string &path, uint64_t &size);
/* At the download side RequestCache() => overloaded RequestCacheFile()
* the class should then call CompletedCache() or FailedCache()
*/
bool RequestCache(CacheData &data, CacheStore *cbStore); /* request from CacheStore */
protected:
/* to be overloaded */
virtual bool RequestCacheFile(RsPeerId id, std::string path, std::string hash, uint64_t size);
virtual bool CancelCacheFile(RsPeerId id, std::string path, std::string hash, uint64_t size);
bool CompletedCache(std::string hash); /* internal completion -> does cb */
bool FailedCache(std::string hash); /* internal completion -> does cb */
private:
CacheStrapper *strapper;
std::map<std::string, CacheData> cbData;
std::map<std::string, CacheStore *> cbStores;
};
#endif

View File

@ -45,6 +45,12 @@ const int ftserverzone = 29539;
#include <iostream>
#include <sstream>
/***
* #define SERVER_DEBUG 1
***/
#define SERVER_DEBUG 1
/* Setup */
ftServer::ftServer(p3AuthMgr *authMgr, p3ConnectMgr *connMgr)
:mAuthMgr(authMgr), mConnMgr(connMgr)
@ -55,11 +61,22 @@ ftServer::ftServer(p3AuthMgr *authMgr, p3ConnectMgr *connMgr)
void ftServer::setConfigDirectory(std::string path)
{
mConfigPath = path;
/* Must update the sub classes ... if they exist
* TODO.
*/
std::string localcachedir = mConfigPath + "/cache/local";
std::string remotecachedir = mConfigPath + "/cache/remote";
//mFiStore -> setCacheDir(remotecachedir);
//mFiMon -> setCacheDir(localcachedir);
}
void ftServer::setPQInterface(PQInterface *pqi)
void ftServer::setP3Interface(P3Interface *pqi)
{
mP3iface = pqi;
}
/* Control Interface */
@ -79,21 +96,9 @@ void ftServer::SetupFtServer(NotifyBase *cb)
std::string remotecachedir = mConfigPath + "/cache/remote";
std::string ownId = mConnMgr->getOwnId();
mFiStore = new ftFiStore(mCacheStrapper, mFtController, cb, ownId, remotecachedir);
mFiMon = new ftFiMonitor(mCacheStrapper, localcachedir, ownId);
/* now add the set to the cachestrapper */
CachePair cp(mFiMon, mFiStore, CacheId(RS_SERVICE_TYPE_FILE_INDEX, 0));
mCacheStrapper -> addCachePair(cp);
/* extras List */
/* search/extras List */
mFtExtra = new ftExtraList();
mFtSearch = new ftFileSearch();
mFtSearch->addSearchMode(mCacheStrapper, RS_FILE_HINTS_CACHE);
mFtSearch->addSearchMode(mFtExtra, RS_FILE_HINTS_EXTRA);
mFtSearch->addSearchMode(mFiMon, RS_FILE_HINTS_LOCAL);
mFtSearch->addSearchMode(mFiStore, RS_FILE_HINTS_REMOTE);
/* Transport */
mFtDataplex = new ftDataMultiplex(this, mFtSearch);
@ -105,8 +110,26 @@ void ftServer::SetupFtServer(NotifyBase *cb)
mFtController->setPartialsDirectory(tmppath);
mFtController->setDownloadDirectory(tmppath);
/* Make Cache Source/Store */
mFiStore = new ftFiStore(mCacheStrapper, mFtController, cb, ownId, remotecachedir);
mFiMon = new ftFiMonitor(mCacheStrapper, localcachedir, ownId);
/* now add the set to the cachestrapper */
CachePair cp(mFiMon, mFiStore, CacheId(RS_SERVICE_TYPE_FILE_INDEX, 0));
mCacheStrapper -> addCachePair(cp);
/* complete search setup */
mFtSearch->addSearchMode(mCacheStrapper, RS_FILE_HINTS_CACHE);
mFtSearch->addSearchMode(mFtExtra, RS_FILE_HINTS_EXTRA);
mFtSearch->addSearchMode(mFiMon, RS_FILE_HINTS_LOCAL);
mFtSearch->addSearchMode(mFiStore, RS_FILE_HINTS_REMOTE);
mConnMgr->addMonitor(mFtController);
mConnMgr->addMonitor(mCacheStrapper);
return;
}
@ -126,11 +149,11 @@ void ftServer::StartupThreads()
//mFiMon->setSharedDirectories(dbase_dirs);
mFiMon->start();
/* start own thread */
//start();
/* Controller thread */
mFtController->start();
/* start own thread */
start();
}
CacheStrapper *ftServer::getCacheStrapper()
@ -143,6 +166,14 @@ CacheTransfer *ftServer::getCacheTransfer()
return mFtController;
}
void ftServer::run()
{
while(1)
{
sleep(1);
}
}
/***************************************************************/
/********************** RsFiles Interface **********************/
@ -471,6 +502,10 @@ int ftServer::tick()
if (mP3iface == NULL)
{
#ifdef SERVER_DEBUG
std::cerr << "ftServer::tick() ERROR: mP3iface == NULL";
#endif
std::ostringstream out;
rslog(RSL_DEBUG_BASIC, ftserverzone,
"filedexserver::tick() Invalid Interface()");
@ -518,10 +553,14 @@ bool ftServer::handleCacheData()
int i = 0;
int i_init = 0;
//std::cerr << "filedexserver::handleInputQueues()" << std::endl;
#ifdef SERVER_DEBUG
std::cerr << "ftServer::handleCacheData()" << std::endl;
#endif
while((ci = mP3iface -> GetSearchResult()) != NULL)
{
//std::cerr << "filedexserver::handleInputQueues() Recvd SearchResult (CacheResponse!)" << std::endl;
#ifdef SERVER_DEBUG
std::cerr << "ftServer::handleCacheData() Recvd SearchResult (CacheResponse!)" << std::endl;
std::ostringstream out;
if (i++ == i_init)
{
@ -529,6 +568,7 @@ bool ftServer::handleCacheData()
}
ci -> print(out);
rslog(RSL_DEBUG_BASIC, ftserverzone, out.str());
#endif
/* these go to the CacheStrapper! */
CacheData data;
@ -548,11 +588,13 @@ bool ftServer::handleCacheData()
i_init = i;
while((cr = mP3iface -> RequestedSearch()) != NULL)
{
#ifdef SERVER_DEBUG
/* just delete these */
std::ostringstream out;
out << "Requested Search:" << std::endl;
cr -> print(out);
rslog(RSL_DEBUG_BASIC, ftserverzone, out.str());
#endif
delete cr;
}
@ -606,7 +648,7 @@ bool ftServer::handleFileData()
while((fr = mP3iface -> GetFileRequest()) != NULL )
{
#ifdef SERVER_DEBUG
std::cerr << "filedexserver::handleInputQueues() Recvd ftFiler Request" << std::endl;
std::cerr << "ftServer::handleFileData() Recvd ftFiler Request" << std::endl;
std::ostringstream out;
if (i == i_init)
{
@ -631,7 +673,7 @@ FileInfo(ffr);
while((fd = mP3iface -> GetFileData()) != NULL )
{
#ifdef SERVER_DEBUG
//std::cerr << "filedexserver::handleInputQueues() Recvd ftFiler Data" << std::endl;
std::cerr << "ftServer::handleFileData() Recvd ftFiler Data" << std::endl;
std::ostringstream out;
if (i == i_init)
{

View File

@ -68,7 +68,7 @@ class ftFileSearch;
class ftDataMultiplex;
class ftServer: public RsFiles, public ftDataSend
class ftServer: public RsFiles, public ftDataSend, public RsThread
{
public:
@ -82,8 +82,7 @@ class ftServer: public RsFiles, public ftDataSend
/* Assign important variables */
void setConfigDirectory(std::string path);
void setPQInterface(PQInterface *pqi);
void setP3Interface(P3Interface *pqi);
/* add Config Items (Extra, Controller) */
void addConfigComponents(p3ConfigMgr *mgr);
@ -97,6 +96,9 @@ void SetupFtServer(NotifyBase *cb);
void StartupThreads();
/* own thread */
virtual void run();
/***************************************************************/
/*************** Control Interface *****************************/
/************** (Implements RsFiles) ***************************/

View File

@ -38,11 +38,12 @@
#include "pqi/p3authmgr.h"
#include "pqi/p3connmgr.h"
#include "util/rsdebug.h"
#include "ft/pqitestor.h"
#include "util/rsdir.h"
#include <sstream>
@ -59,13 +60,15 @@ int main(int argc, char **argv)
{
int c;
uint32_t period = 1;
uint32_t dPeriod = 600; /* default 10 minutes */
uint32_t debugLevel = 5;
bool debugStderr = true;
std::list<std::string> fileList;
std::list<std::string> peerIds;
std::list<ftServer *> mFtServers;
std::map<std::string, ftServer *> mFtServers;
std::map<std::string, p3ConnectMgr *> mConnMgrs;
while(-1 != (c = getopt(argc, argv, "d:p:")))
while(-1 != (c = getopt(argc, argv, "d:p:s")))
{
switch (c)
{
@ -73,7 +76,10 @@ int main(int argc, char **argv)
peerIds.push_back(optarg);
break;
case 'd':
dPeriod = atoi(optarg);
debugLevel = atoi(optarg);
break;
case 's':
debugStderr = true;
break;
default:
usage(argv[0]);
@ -81,6 +87,9 @@ int main(int argc, char **argv)
}
}
/* do logging */
setOutputLevel(debugLevel);
if (optind >= argc)
{
std::cerr << "Missing Files" << std::endl;
@ -106,7 +115,7 @@ int main(int argc, char **argv)
std::list<pqiAuthDetails> baseFriendList, friendList;
std::list<pqiAuthDetails>::iterator fit;
PQIHub *testHub = new PQIHub();
P3Hub *testHub = new P3Hub();
testHub->start();
/* Setup Base Friend Info */
@ -125,6 +134,14 @@ int main(int argc, char **argv)
std::cerr << std::endl;
}
std::ostringstream pname;
pname << "/tmp/rstst-" << time(NULL);
std::string basepath = pname.str();
RsDirUtil::checkCreateDirectory(basepath);
for(it = peerIds.begin(); it != peerIds.end(); it++)
{
friendList = baseFriendList;
@ -140,6 +157,8 @@ int main(int argc, char **argv)
p3AuthMgr *authMgr = new p3DummyAuthMgr(*it, friendList);
p3ConnectMgr *connMgr = new p3ConnectMgr(authMgr);
mConnMgrs[*it] = connMgr;
for(fit = friendList.begin(); fit != friendList.end(); fit++)
{
@ -147,33 +166,62 @@ int main(int argc, char **argv)
connMgr->addFriend(fit->id);
}
PQIPipe *pipe = new PQIPipe(*it);
P3Pipe *pipe = new P3Pipe(); //(*it);
/* add server */
ftServer *server;
server = new ftServer(authMgr, connMgr);
mFtServers[*it] = server;
PQInterface *pqi = NULL;
server->setPQInterface(pipe);
server->setP3Interface(pipe);
std::string configpath = basepath + "/" + *it;
RsDirUtil::checkCreateDirectory(configpath);
std::string cachepath = configpath + "/cache";
RsDirUtil::checkCreateDirectory(cachepath);
std::string localpath = cachepath + "/local";
RsDirUtil::checkCreateDirectory(localpath);
std::string remotepath = cachepath + "/remote";
RsDirUtil::checkCreateDirectory(remotepath);
server->setConfigDirectory(configpath);
NotifyBase *base = NULL;
server->SetupFtServer(base);
testHub->addPQIPipe(*it, pipe, connMgr);
testHub->addP3Pipe(*it, pipe, connMgr);
server->StartupThreads();
//server->start();
/* setup any extra bits */
server->setSharedDirectories(fileList);
}
/* stick your real test here */
std::map<std::string, ftServer *>::iterator sit;
std::map<std::string, p3ConnectMgr *>::iterator cit;
while(1)
{
std::cerr << "ftserver1test::sleep()";
std::cerr << std::endl;
sleep(1);
/* tick the connmgrs */
for(sit = mFtServers.begin(); sit != mFtServers.end(); sit++)
{
/* update */
(sit->second)->tick();
}
for(cit = mConnMgrs.begin(); cit != mConnMgrs.end(); cit++)
{
/* update */
(cit->second)->tick();
}
}
}

View File

@ -43,7 +43,7 @@ ftTransferModule::ftTransferModule(ftFileCreator *fc, ftDataMultiplex *dm)
// Dummy for Testing (should be handled independantly for
// each peer.
mChunkSize = 10000;
//mChunkSize = 10000;
return;
}
@ -100,7 +100,7 @@ uint32_t ftTransferModule::getDataRate(std::string peerId)
if (mit == mOnlinePeers.end())
return 0;
else
return (mit->second).actualRate;
return (uint32_t) (mit->second).actualRate;
}
@ -135,15 +135,15 @@ void ftTransferModule::requestData(std::string peerId, uint64_t offset, uint32_t
}
bool ftTransferModule::getChunk(uint64_t &offset, uint32_t &chunk_size)
{
return mFileCreator->getMissingChunk(offset, chunk_size);
}
bool ftTransferModule::storeData(uint64_t offset, uint32_t chunk_size,void *data)
{
mFileCreator -> addFileData(offset, chunk_size, data);
return mFileCreator->getMissingChunk(offset, chunk_size);
}
bool ftTransferModule::storeData(uint64_t offset, uint32_t chunk_size,void *data)
{
return mFileCreator -> addFileData(offset, chunk_size, data);
}
void ftTransferModule::queryInactive()
{
#ifdef FT_DEBUG
@ -152,9 +152,11 @@ void ftTransferModule::queryInactive()
out<<std:endl;
#endif
int ts = time(NULL);
int offset,size,delta;
int ts = time(NULL);
uint64_t offset;
uint32_t size;
int delta;
std::map<std::string,peerInfo>::iterator mit;
for(mit = mOnlinePeers.begin(); mit != mOnlinePeers.end(); mit++)
{
@ -180,26 +182,26 @@ void ftTransferModule::queryInactive()
//file request has been sent to peer side, but no response received yet
case PQIPEER_DOWNLOADING:
if (ts - ((mit->second).lastTS) > PQIPEER_DOWNLOAD_CHECK)
requestData(mit->first, (mit->second).offset,(mit->second).size); //give a push
requestData(mit->first, (mit->second).offset,(mit->second).chunkSize); //give a push
actualRate += (mit->second).actualRate;
break;
//file response has been received or peer side is just ready for download
case PQIPEER_IDLE:
(mit->second).actualRate = (mit->second).size/(ts-(mit-second).lastTS);
if ((mit->second).actualRate < (mit->second).desireRate)
(mit->second).actualRate = (mit->second).chunkSize/(ts-(mit->second).lastTS);
if ((mit->second).actualRate < (mit->second).desiredRate)
{
size = (mit->second).size *2 ;
size = (mit->second).chunkSize * 2 ;
}
else
{
size = (mit->second).size * 0.9 ;
size = (uint32_t ) ((mit->second).chunkSize * 0.9) ;
}
if (getChunk(offset,size))
{
(mit->second).offset = offset;
(mit->second).size = size;
(mit->second).chunkSize = size;
(mit->second).lastTS = ts;
(mit->second).state = PQIPEER_DOWNLOADING;
requestData(mit->first,offset,size);
@ -242,10 +244,11 @@ bool ftTransferModule::resumeTransfer()
return 1;
}
bool ftTransferModule::completeFileTransfer()
{
}
bool ftTransferModule::completeFileTransfer()
{
return true;
}
int ftTransferModule::tick()
{
queryInactive();
@ -265,12 +268,12 @@ void ftTransferModule::adjustSpeed()
if (((mit->second).state == PQIPEER_DOWNLOADING)
|| ((mit->second).state == PQIPEER_IDLE))
{
if (actualRate < desiredRate) && ((mit->second).actualRate >= (mit->second).desiredRate)
if ((actualRate < desiredRate) && ((mit->second).actualRate >= (mit->second).desiredRate))
{
(mit->second).desiredRate *= 1.1;
}
if (actualRate > desiredRate) && ((mit->second).actualRate < (mit->second).desiredRate)
if ((actualRate > desiredRate) && ((mit->second).actualRate < (mit->second).desiredRate))
{
(mit->second).desiredRate *= 0.9;
}

View File

@ -57,24 +57,24 @@ class Request
uint32_t chunkSize;
};
class peerInfo
{
class peerInfo
{
public:
std::string peerId;
uint32_t state;
uint32_t desiredRate;
uint32_t actualRate;
//current file data request
std::string peerId;
uint32_t state;
double desiredRate;
double actualRate;
//current file data request
uint64_t offset;
uint32_t chunkSize;
//already received data size
uint32_t receivedSize;
time_t lastTS;
};
uint32_t chunkSize;
//already received data size
uint32_t receivedSize;
time_t lastTS;
};
class ftTransferModule
{
public:
@ -122,8 +122,8 @@ private:
std::map<std::string,peerInfo> mOnlinePeers;
bool mFlag; //1:transfer complete, 0: not complete
uint32_t desiredRate;
uint32_t actualRate;
double desiredRate;
double actualRate;
};
#endif //FT_TRANSFER_MODULE_HEADER

View File

@ -27,12 +27,12 @@
#include "pqi/p3connmgr.h"
PQIHub::PQIHub()
P3Hub::P3Hub()
{
return;
}
void PQIHub::addPQIPipe(std::string id, PQIPipe *pqi, p3ConnectMgr *mgr)
void P3Hub::addP3Pipe(std::string id, P3Pipe *pqi, p3ConnectMgr *mgr)
{
hubItem item(id, pqi, mgr);
@ -46,20 +46,20 @@ void PQIHub::addPQIPipe(std::string id, PQIPipe *pqi, p3ConnectMgr *mgr)
mPeers[id] = item;
/* tell all the other peers we are connected */
std::cerr << "PQIHub::addPQIPipe()";
std::cerr << "P3Hub::addPQIPipe()";
std::cerr << std::endl;
}
void PQIHub::run()
void P3Hub::run()
{
RsItem *item;
std::list<RsItem *> recvdQ;
std::list<RsItem *>::iterator lit;
while(1)
{
std::cerr << "PQIHub::run()";
std::cerr << "P3Hub::run()";
std::cerr << std::endl;
std::map<std::string, hubItem>::iterator it;
@ -67,9 +67,12 @@ void PQIHub::run()
{
while (NULL != (item = it->second.mPQI->PopSentItem()))
{
std::cerr << "PQIHub::run() recvd msg from: ";
std::cerr << "P3Hub::run() recvd msg from: ";
std::cerr << it->first;
std::cerr << std::endl;
item->print(std::cerr, 10);
std::cerr << std::endl;
recvdQ.push_back(item);
}
}
@ -83,13 +86,18 @@ void PQIHub::run()
std::cerr << "Failed to Find destination: " << pId;
std::cerr << std::endl;
}
std::cerr << "PQIHub::run() sending msg to: ";
std::cerr << "P3Hub::run() sending msg to: ";
std::cerr << it->first;
std::cerr << std::endl;
(*lit)->print(std::cerr, 10);
std::cerr << std::endl;
(it->second).mPQI->PushRecvdItem(*lit);
}
recvdQ.clear();
/* Tick the Connection Managers (normally done by rsserver)
*/
@ -159,3 +167,180 @@ RsItem *PQIPipe::GetItem()
}
/***** P3Pipe here *****/
int P3Pipe::SendAllItem(RsItem *item)
{
RsStackMutex stack(pipeMtx); /***** LOCK MUTEX ****/
mSentItems.push_back(item);
return 1;
}
RsItem *P3Pipe::PopSentItem()
{
RsStackMutex stack(pipeMtx); /***** LOCK MUTEX ****/
if (mSentItems.size() == 0)
{
return NULL;
}
RsItem *item = mSentItems.front();
mSentItems.pop_front();
return item;
}
int P3Pipe::PushRecvdItem(RsItem *item)
{
RsStackMutex stack(pipeMtx); /***** LOCK MUTEX ****/
RsCacheRequest *rcr;
RsCacheItem *rci;
RsFileRequest *rfr;
RsFileData *rfd;
RsRawItem *rri;
if (NULL != (rcr = dynamic_cast<RsCacheRequest *>(item)))
{
mRecvdRsCacheRequests.push_back(rcr);
}
else if (NULL != (rci = dynamic_cast<RsCacheItem *>(item)))
{
mRecvdRsCacheItems.push_back(rci);
}
else if (NULL != (rfr = dynamic_cast<RsFileRequest *>(item)))
{
mRecvdRsFileRequests.push_back(rfr);
}
else if (NULL != (rfd = dynamic_cast<RsFileData *>(item)))
{
mRecvdRsFileDatas.push_back(rfd);
}
else if (NULL != (rri = dynamic_cast<RsRawItem *>(item)))
{
mRecvdRsRawItems.push_back(rri);
}
return 1;
}
int P3Pipe::SearchSpecific(RsCacheRequest *item)
{
SendAllItem(item);
return 1;
}
int P3Pipe::SendSearchResult(RsCacheItem *item)
{
SendAllItem(item);
return 1;
}
int P3Pipe::SendFileRequest(RsFileRequest *item)
{
SendAllItem(item);
return 1;
}
int P3Pipe::SendFileData(RsFileData *item)
{
SendAllItem(item);
return 1;
}
int P3Pipe::SendRsRawItem(RsRawItem *item)
{
SendAllItem(item);
return 1;
}
// Cache Requests
RsCacheRequest *P3Pipe::RequestedSearch()
{
RsStackMutex stack(pipeMtx); /***** LOCK MUTEX ****/
if (mRecvdRsCacheRequests.size() == 0)
{
return NULL;
}
RsCacheRequest *item = mRecvdRsCacheRequests.front();
mRecvdRsCacheRequests.pop_front();
return item;
}
// Cache Results
RsCacheItem *P3Pipe::GetSearchResult()
{
RsStackMutex stack(pipeMtx); /***** LOCK MUTEX ****/
if (mRecvdRsCacheItems.size() == 0)
{
return NULL;
}
RsCacheItem *item = mRecvdRsCacheItems.front();
mRecvdRsCacheItems.pop_front();
return item;
}
// FileTransfer.
RsFileRequest *P3Pipe::GetFileRequest()
{
RsStackMutex stack(pipeMtx); /***** LOCK MUTEX ****/
if (mRecvdRsFileRequests.size() == 0)
{
return NULL;
}
RsFileRequest *item = mRecvdRsFileRequests.front();
mRecvdRsFileRequests.pop_front();
return item;
}
RsFileData *P3Pipe::GetFileData()
{
RsStackMutex stack(pipeMtx); /***** LOCK MUTEX ****/
if (mRecvdRsFileDatas.size() == 0)
{
return NULL;
}
RsFileData *item = mRecvdRsFileDatas.front();
mRecvdRsFileDatas.pop_front();
return item;
}
RsRawItem *P3Pipe::GetRsRawItem()
{
RsStackMutex stack(pipeMtx); /***** LOCK MUTEX ****/
if (mRecvdRsRawItems.size() == 0)
{
return NULL;
}
RsRawItem *item = mRecvdRsRawItems.front();
mRecvdRsRawItems.pop_front();
return item;
}

View File

@ -43,6 +43,8 @@
class hubItem;
class PQIPipe;
class PQIHub;
class P3Pipe;
class P3Hub;
class p3ConnectMgr;
@ -52,21 +54,21 @@ class hubItem
hubItem()
:mPQI(NULL), mConnMgr(NULL) { return; }
hubItem(std::string id, PQIPipe *pqi, p3ConnectMgr *mgr)
hubItem(std::string id, P3Pipe *pqi, p3ConnectMgr *mgr)
:mPeerId(id), mPQI(pqi), mConnMgr(mgr) { return; }
std::string mPeerId;
PQIPipe *mPQI;
P3Pipe *mPQI;
p3ConnectMgr *mConnMgr;
};
class PQIHub: public RsThread
class P3Hub: public RsThread
{
public:
PQIHub();
void addPQIPipe(std::string id, PQIPipe *, p3ConnectMgr *mgr);
P3Hub();
void addP3Pipe(std::string id, P3Pipe *, p3ConnectMgr *mgr);
virtual void run();
@ -99,5 +101,47 @@ private:
};
class P3Pipe: public P3Interface
{
public:
P3Pipe() {return; }
virtual ~P3Pipe() {return; }
virtual int tick() { return 1; }
virtual int status() { return 1; }
/* Overloaded from P3Interface */
virtual int SearchSpecific(RsCacheRequest *item);
virtual int SendSearchResult(RsCacheItem *item);
virtual int SendFileRequest(RsFileRequest *item);
virtual int SendFileData(RsFileData *item);
virtual int SendRsRawItem(RsRawItem *item);
virtual RsCacheRequest *RequestedSearch();
virtual RsCacheItem *GetSearchResult();
virtual RsFileRequest *GetFileRequest();
virtual RsFileData *GetFileData();
virtual RsRawItem *GetRsRawItem();
/* Lower Interface for PQIHub */
RsItem *PopSentItem();
int PushRecvdItem(RsItem *item);
private:
int SendAllItem(RsItem *item);
RsMutex pipeMtx;
std::list<RsItem *> mSentItems;
std::list<RsCacheRequest *> mRecvdRsCacheRequests;
std::list<RsCacheItem *> mRecvdRsCacheItems;
std::list<RsFileRequest *> mRecvdRsFileRequests;
std::list<RsFileData *> mRecvdRsFileDatas;
std::list<RsRawItem *> mRecvdRsRawItems;
};
#endif

View File

@ -1190,6 +1190,34 @@ bool p3ConnectMgr::isFriend(std::string id)
return (mFriendList.end() != mFriendList.find(id));
}
bool p3ConnectMgr::isOnline(std::string id)
{
RsStackMutex stack(connMtx); /****** STACK LOCK MUTEX *******/
std::map<std::string, peerConnectState>::iterator it;
if (mFriendList.end() != (it = mFriendList.find(id)))
{
#ifdef CONN_DEBUG
std::cerr << "p3ConnectMgr::isOnline(" << id;
std::cerr << ") is Friend, Online: ";
std::cerr << (it->second.state & RS_PEER_S_CONNECTED);
std::cerr << std::endl;
#endif
return (it->second.state & RS_PEER_S_CONNECTED);
}
else
{
#ifdef CONN_DEBUG
std::cerr << "p3ConnectMgr::isOnline(" << id;
std::cerr << ") is Not Friend";
std::cerr << std::endl;
#endif
/* not a friend */
}
return false;
}
bool p3ConnectMgr::getFriendNetStatus(std::string id, peerConnectState &state)
{
RsStackMutex stack(connMtx); /****** STACK LOCK MUTEX *******/

View File

@ -197,6 +197,7 @@ const std::string getOwnId();
bool getOwnNetStatus(peerConnectState &state);
bool isFriend(std::string id);
bool isOnline(std::string id);
bool getFriendNetStatus(std::string id, peerConnectState &state);
bool getOthersNetStatus(std::string id, peerConnectState &state);

View File

@ -70,6 +70,7 @@ const uint32_t RS_FILE_HINTS_DOWNLOAD = 0x00000010;
const uint32_t RS_FILE_HINTS_UPLOAD = 0x00000020;
const uint32_t RS_FILE_HINTS_SPEC_ONLY = 0x01000000;
const uint32_t RS_FILE_HINTS_NO_SEARCH = 0x02000000;
const uint32_t RS_FILE_EXTRA_DELETE = 0x0010;

View File

@ -566,35 +566,6 @@ int filedexserver::handleOutputQueues()
//std::cerr << "filedexserver::handleOutputQueues()" << std::endl;
int i = 0;
#if 0 /* no more cache queries -> results are pushed */
std::list<RsPeerId> ids;
std::list<RsPeerId>::iterator pit;
mCacheStrapper->sendCacheQuery(ids, time(NULL));
for(pit = ids.begin(); pit != ids.end(); pit++)
{
//std::cerr << "filedexserver::handleOutputQueues() Cache Query for: " << (*pit) << std::endl;
/* now create one! */
RsCacheRequest *cr = new RsCacheRequest();
cr->PeerId(*pit);
std::ostringstream out;
if (i++ == 0)
{
out << "Outgoing CacheStrapper -> SearchItem:" << std::endl;
}
cr -> print(out);
pqioutput(PQL_DEBUG_BASIC, fldxsrvrzone, out.str());
/* send it off */
pqisi -> SearchSpecific(cr);
}
#endif
/* now see if the filer has any data */
ftFileRequest *ftr;
while((ftr = ftFiler -> sendFileInfo()) != NULL)