More Improvements to FileTransfer:

* Added TlvShallowClear() to serialisers
 * Added RsQueueThread for periodic queue processes.
 * Completed ftDataMultiplex which replaces ftServer/ClientModules.
 * Added Server Queue / Thread to ftDataMultiplex.
 * Added ftdataplextest to exercise ftDataMultiplex.
 * Generalised ftFileSearch to handle an array of ftSearch classes.
 * Tweaked rsfiles.h #defines to match new ftFileSearch scheme.
 * Added Generic ftData Interfaces for Testing.
 * added ftDataSend/Recv Interfaces to ftServer + ftDataMultiplex respectively.
 * Completed much of ftServer (External Interface), but not yet done.
 * Extra debugging and small changes to ftExtraList
 * Integrated new ftTransferModule with minor interface changes.



git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@660 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
drbob 2008-08-03 12:45:53 +00:00
parent d584516859
commit 5c6e558942
22 changed files with 1355 additions and 326 deletions

View file

@ -23,16 +23,56 @@
*
*/
#include "util/rsdebug.h"
const int ftserverzone = 29539;
#include "ft/ftserver.h"
#include "ft/ftextralist.h"
#include "ft/ftfilesearch.h"
#include "ft/ftcontroller.h"
#include "ft/ftdatamultiplex.h"
#include "dbase/cachestrapper.h"
#include "dbase/fimonitor.h"
#include "dbase/fistore.h"
#include "pqi/pqi.h"
#include "pqi/p3connmgr.h"
#include <iostream>
#include <sstream>
/* Setup */
ftServer::ftServer(CacheStrapper *cStrapper, p3ConnectMgr *connMgr)
:mCacheStrapper(cStrapper), mConnMgr(connMgr)
{
}
void ftServer::setConfigDirectory(std::string path)
{
mConfigPath = path;
}
void ftServer::setPQInterface(PQInterface *pqi)
{
}
/* Control Interface */
/* add Config Items (Extra, Controller) */
void ftServer::addConfigComponents(p3ConfigMgr *mgr)
{
/* NOT SURE ABOUT THIS ONE */
}
/* Final Setup (once everything is assigned) */
ftServer::SetupFtServer()
void ftServer::SetupFtServer()
{
/* make Controller */
mFtController = new ftController();
mFtController = new ftController(config_dir);
NotifyBase *cb = getNotify();
/* setup FiStore/Monitor */
@ -49,43 +89,42 @@ ftServer::SetupFtServer()
/* extras List */
mFtExtra = new ftExtraList();
mFtSearch = new ftFileSearch(mCacheStrapper, mFtExtra, mFiMon, mFiStore);
mFtSearch = new ftFileSearch();
//mFtSearch->addSearchMode(mCacheStrapper, RS_FILE_HINTS_CACHE);
mFtSearch->addSearchMode(mFtExtra, RS_FILE_HINTS_EXTRA);
//mFtSearch->addSearchMode(mFiMon, RS_FILE_HINTS_LOCAL);
//mFtSearch->addSearchMode(mFiStore, RS_FILE_HINTS_REMOTE);
mFtController -> setFtSearch(mFtSearch);
ftFiler -> setSaveBasePath(save_dir);
mFtController -> setSaveBasePath(save_dir);
mFtDataplex = ftDataMultiplex(this, mFtSearch);
return;
}
/* Assign important variables */
void setConfigDirectory(std::string path);
void setPQInterface(PQInterface *pqi);
/* Final Setup (once everything is assigned) */
void SetupFtServer();
/* add Config Items (Extra, Controller) */
void addConfigComponents(p3ConfigMgr *mgr);
void ftServer::setConfigDirectory(std::string path)
{
mConfigPath = path;
}
void ftServer::setPQInterface(PQInterface *pqi)
/* Control Interface */
};
void ftServer::StartupThreads()
{
/* start up Controller thread */
/* start up order - important for dependencies */
/* self contained threads */
/* startup ExtraList Thread */
mFtExtra->start();
/* startup Monitor Thread */
/* startup the FileMonitor (after cache load) */
mFiMon->setPeriod(600); /* 10 minutes */
/* start it up */
//mFiMon->setSharedDirectories(dbase_dirs);
mFiMon->start();
/* start own thread */
//start();
/* Controller thread */
mFtController->start();
}
CacheStrapper *ftServer::getCacheStrapper()
@ -98,6 +137,7 @@ CacheTransfer *ftServer::getCacheTransfer()
return mFtController;
}
/***************************************************************/
/********************** RsFiles Interface **********************/
/***************************************************************/
@ -113,23 +153,23 @@ bool ftServer::FileRequest(std::string fname, std::string hash,
return mFtController->FileRequest(fname, hash, size, dest, flags);
}
bool ftServer::FileCancel(std::string hash);
bool ftServer::FileCancel(std::string hash)
{
return mFtController->FileCancel(hash);
}
bool ftServer::FileControl(std::string hash, uint32_t flags);
bool ftServer::FileControl(std::string hash, uint32_t flags)
{
return mFtController->FileControl(hash, flags);
}
bool ftServer::FileClearCompleted();
bool ftServer::FileClearCompleted()
{
return mFtController->FileClearCompleted();
}
/* get Details of File Transfers */
bool ftServer::FileDownloads(std::list<std::string> &hashs);
bool ftServer::FileDownloads(std::list<std::string> &hashs)
{
return mFtController->FileDownloads(hashs);
}
@ -137,7 +177,7 @@ bool ftServer::FileDownloads(std::list<std::string> &hashs);
/* Directory Handling */
void ftServer::setDownloadDirectory(std::string path)
{
return mFtController->setDownloadDirectory(path);
mFtController->setDownloadDirectory(path);
}
std::string ftServer::getDownloadDirectory()
@ -145,12 +185,12 @@ std::string ftServer::getDownloadDirectory()
return mFtController->getDownloadDirectory();
}
void ftServer::setPartialsDirectory(std::string path);
void ftServer::setPartialsDirectory(std::string path)
{
return mFtController->setPartialsDirectory(path);
mFtController->setPartialsDirectory(path);
}
void ftServer::getPartialsDirectory()
std::string ftServer::getPartialsDirectory()
{
return mFtController->getPartialsDirectory();
}
@ -160,21 +200,21 @@ void ftServer::getPartialsDirectory()
/************************* Other Access ************************/
/***************************************************************/
bool ftServer::FileUploads(std::list<std::string> &hashs);
bool ftServer::FileUploads(std::list<std::string> &hashs)
{
return mFtUploader->FileUploads(hashes);
return mFtDataplex->FileUploads(hashes);
}
bool ftServer::FileDetails(std::string hash, uint32_t hintflags, FileInfo &info);
bool ftServer::FileDetails(std::string hash, uint32_t hintflags, FileInfo &info)
{
bool found = false;
if (hintflags | DOWNLOADING)
if (hintflags | RS_FILE_HINTS_DOWNLOAD)
{
found = mFtController->FileDetails(hash, info);
}
else if (hintflags | UPLOADING)
else if (hintflags | RS_FILE_HINTS_UPLOAD)
{
found = mFtUploader->FileDetails(hash, info);
found = mFtDataplex->FileDetails(hash, info);
}
if (!found)
@ -194,17 +234,17 @@ bool ftServer::ExtraFileAdd(std::string fname, std::string hash, uint32_t size,
return mFtExtra->addExtraFile(fname, hash, size, period, flags);
}
bool ftServer::ExtraFileRemove(std::string hash, uin32_t flags);
bool ftServer::ExtraFileRemove(std::string hash, uint32_t flags)
{
return mFtExtra->removeExtraFile(hash, flags);
}
bool ftServer::ExtraFileHash(std::string localpath, uint32_t period, uint32_t flags);
bool ftServer::ExtraFileHash(std::string localpath, uint32_t period, uint32_t flags)
{
return mFtExtra->hashExtraFile(localpath, period, flags);
}
bool ftServer::ExtraFileStatus(std::string localpath, FileInfo &info);
bool ftServer::ExtraFileStatus(std::string localpath, FileInfo &info)
{
return mFtExtra->hashExtraFileDone(localpath, info);
}
@ -213,12 +253,12 @@ bool ftServer::ExtraFileStatus(std::string localpath, FileInfo &info);
/******************** Directory Listing ************************/
/***************************************************************/
int ftServer::RequestDirDetails(std::string uid, std::string path, DirDetails &details);
int ftServer::RequestDirDetails(std::string uid, std::string path, DirDetails &details)
{
return mFiStore->RequestDirDetails(uid, path, details);
}
int ftServer::RequestDirDetails(void *ref, DirDetails &details, uint32_t flags);
int ftServer::RequestDirDetails(void *ref, DirDetails &details, uint32_t flags)
{
return mFiStore->RequestDirDetails(ref, details, flags);
}
@ -228,12 +268,12 @@ int ftServer::RequestDirDetails(void *ref, DirDetails &details, uint32_t flags);
/***************************************************************/
int ftServer::SearchKeywords(std::list<std::string> keywords, std::list<FileDetail> &results);
int ftServer::SearchKeywords(std::list<std::string> keywords, std::list<FileDetail> &results)
{
return mFiStore->SearchKeywords(keywords, results);
}
int ftServer::SearchBoolExp(Expression * exp, std::list<FileDetail> &results);
int ftServer::SearchBoolExp(Expression * exp, std::list<FileDetail> &results)
{
return mFiStore->searchBoolExp(exp, results);
}
@ -256,22 +296,22 @@ void ftServer::ForceDirectoryCheck()
bool ftServer::InDirectoryCheck()
{
return mFtMon->inDirectoryCheck();
return mFiMon->inDirectoryCheck();
}
bool ftServer::getSharedDirectories(std::list<std::string> &dirs)
{
return mFtMon->getSharedDirectories(dirs);
return mFiMon->getSharedDirectories(dirs);
}
bool ftServer::addSharedDirectory(std::string dir)
{
return mFtMon->addSharedDirectory(dir);
return mFiMon->addSharedDirectory(dir);
}
bool ftServer::removeSharedDirectory(std::string dir)
{
return mFtMon->removeSharedDirectory(dir);
return mFiMon->removeSharedDirectory(dir);
}
@ -286,16 +326,309 @@ bool ftServer::removeSharedDirectory(std::string dir)
protected:
/* Key Functions to be overloaded for Full Configuration */
virtual RsSerialiser *setupSerialiser();
virtual std::list<RsItem *> saveList(bool &cleanup);
virtual bool loadList(std::list<RsItem *> load);
RsSerialiser *ftServer::setupSerialiser()
{
return NULL;
}
private:
bool loadConfigMap(std::map<std::string, std::string> &configMap);
/******************* p3 Config Overload ************************/
std::list<RsItem *> ftServer::saveList(bool &cleanup)
{
std::list<RsItem *> list;
return list;
}
};
bool ftServer::loadList(std::list<RsItem *> load)
{
return true;
}
bool ftServer::loadConfigMap(std::map<std::string, std::string> &configMap)
{
return true;
}
/***************************************************************/
/********************** Data Flow **********************/
/***************************************************************/
/* Client Send */
bool ftServer::sendDataRequest(std::string peerId, std::string hash,
uint64_t size, uint64_t offset, uint32_t chunksize)
{
/* create a packet */
/* push to networking part */
RsFileRequest *rfi = new RsFileRequest();
/* id */
rfi->PeerId(peerId);
/* file info */
rfi->file.filesize = size;
rfi->file.hash = hash; /* ftr->hash; */
/* offsets */
rfi->fileoffset = offset; /* ftr->offset; */
rfi->chunksize = chunksize; /* ftr->chunk; */
mP3iface->SendFileRequest(rfi);
}
const uint32_t MAX_FT_CHUNK = 32 * 1024; /* 32K */
/* Server Send */
bool ftServer::sendData(std::string peerId, std::string hash, uint64_t size,
uint64_t baseoffset, uint32_t chunksize, void *data)
{
/* create a packet */
/* push to networking part */
uint32_t tosend = chunksize;
uint64_t offset = 0;
uint32_t chunk;
while(tosend > 0)
{
/* workout size */
chunk = MAX_FT_CHUNK;
if (chunk > tosend)
{
chunk = tosend;
}
/******** New Serialiser Type *******/
RsFileData *rfd = new RsFileData();
/* set id */
rfd->PeerId(peerId);
/* file info */
rfd->fd.file.filesize = size;
rfd->fd.file.hash = hash;
rfd->fd.file.name = ""; /* blank other data */
rfd->fd.file.path = "";
rfd->fd.file.pop = 0;
rfd->fd.file.age = 0;
rfd->fd.file_offset = baseoffset + offset;
/* file data */
rfd->fd.binData.setBinData(
&(((uint8_t *) data)[offset]), chunk);
mP3iface->SendFileData(rfd);
offset += chunk;
tosend -= chunk;
}
/* clean up data */
free(data);
}
int ftServer::tick()
{
rslog(RSL_DEBUG_BASIC, ftserverzone,
"filedexserver::tick()");
/* the new Cache Hack() */
FileStoreTick();
if (mP3iface == NULL)
{
std::ostringstream out;
rslog(RSL_DEBUG_BASIC, ftserverzone,
"filedexserver::tick() Invalid Interface()");
return 1;
}
int moreToTick = 0;
if (0 < mP3iface -> tick())
{
moreToTick = 1;
#ifdef DEBUG_TICK
std::cerr << "filedexserver::tick() moreToTick from mP3iface" << std::endl;
#endif
}
if (0 < handleInputQueues())
{
moreToTick = 1;
#ifdef DEBUG_TICK
std::cerr << "filedexserver::tick() moreToTick from InputQueues" << std::endl;
#endif
}
return moreToTick;
}
// This function needs to be divided up.
bool ftServer::handleInputQueues()
{
handleCacheData();
handleFileData();
}
bool ftServer::handleCacheData()
{
// get all the incoming results.. and print to the screen.
RsCacheRequest *cr;
RsCacheItem *ci;
// Loop through Search Results.
int i = 0;
int i_init = 0;
//std::cerr << "filedexserver::handleInputQueues()" << std::endl;
while((ci = mP3iface -> GetSearchResult()) != NULL)
{
//std::cerr << "filedexserver::handleInputQueues() Recvd SearchResult (CacheResponse!)" << std::endl;
std::ostringstream out;
if (i++ == i_init)
{
out << "Recieved Search Results:" << std::endl;
}
ci -> print(out);
rslog(RSL_DEBUG_BASIC, ftserverzone, out.str());
/* these go to the CacheStrapper! */
CacheData data;
data.cid = CacheId(ci->cacheType, ci->cacheSubId);
data.hash = ci->file.hash;
data.size = ci->file.filesize;
data.name = ci->file.name;
data.path = ci->file.path;
data.pid = ci->PeerId();
data.pname = mAuthMgr->getName(ci->PeerId());
mCacheStrapper->recvCacheResponse(data, time(NULL));
delete ci;
}
// now requested Searches.
i_init = i;
while((cr = mP3iface -> RequestedSearch()) != NULL)
{
/* just delete these */
std::ostringstream out;
out << "Requested Search:" << std::endl;
cr -> print(out);
rslog(RSL_DEBUG_BASIC, ftserverzone, out.str());
delete cr;
}
// Now handle it replacement (pushed cache results)
{
std::list<std::pair<RsPeerId, CacheData> > cacheUpdates;
std::list<std::pair<RsPeerId, CacheData> >::iterator it;
mCacheStrapper->getCacheUpdates(cacheUpdates);
for(it = cacheUpdates.begin(); it != cacheUpdates.end(); it++)
{
/* construct reply */
RsCacheItem *ci = new RsCacheItem();
/* id from incoming */
ci -> PeerId(it->first);
ci -> file.hash = (it->second).hash;
ci -> file.name = (it->second).name;
ci -> file.path = ""; // (it->second).path;
ci -> file.filesize = (it->second).size;
ci -> cacheType = (it->second).cid.type;
ci -> cacheSubId = (it->second).cid.subid;
#ifdef SERVER_DEBUG
std::ostringstream out2;
out2 << "Outgoing CacheStrapper Update -> RsCacheItem:" << std::endl;
ci -> print(out2);
std::cerr << out2.str() << std::endl;
#endif
//rslog(RSL_DEBUG_BASIC, ftserverzone, out2.str());
mP3iface -> SendSearchResult(ci);
}
}
}
bool ftServer::handleFileData()
{
// now File Input.
RsFileRequest *fr;
RsFileData *fd;
int i_init = 0;
int i = 0;
i_init = i;
while((fr = mP3iface -> GetFileRequest()) != NULL )
{
#ifdef SERVER_DEBUG
std::cerr << "filedexserver::handleInputQueues() Recvd ftFiler Request" << std::endl;
std::ostringstream out;
if (i == i_init)
{
out << "Incoming(Net) File Item:" << std::endl;
}
fr -> print(out);
rslog(RSL_DEBUG_BASIC, ftserverzone, out.str());
#endif
i++; /* count */
mFtDataplex->recvDataRequest(fr->PeerId(),
fr->file.hash, fr->file.filesize,
fr->fileoffset, fr->chunksize);
FileInfo(ffr);
delete fr;
}
// now File Data.
i_init = i;
while((fd = mP3iface -> GetFileData()) != NULL )
{
#ifdef SERVER_DEBUG
//std::cerr << "filedexserver::handleInputQueues() Recvd ftFiler Data" << std::endl;
std::ostringstream out;
if (i == i_init)
{
out << "Incoming(Net) File Data:" << std::endl;
}
fd -> print(out);
rslog(RSL_DEBUG_BASIC, ftserverzone, out.str());
#endif
i++; /* count */
/* incoming data */
mFtDataplex->recvData(fd->PeerId(),
fd->fd.file.hash, fd->fd.file.filesize,
fd->fd.file_offset,
fd->fd.binData.bin_len,
fd->fd.binData.bin_data);
/* we've stolen the data part -> so blank before delete
*/
fd->fd.TlvShallowClear();
delete fd;
}
if (i > 0)
{
return 1;
}
return 0;
}
/**********************************
**********************************
**********************************
*********************************/