* Added Configurations to new file transfer.

* Enabled config in p3file-startup.cc
 * Enabled resumeTransfers
 * Added new RsFileConfigItem to serialiser.
 * extended ftFiMonitor to use Configuration.
 * bug fix to add/remove Shared Dirs.
 * Increased Channel/forum periods to 3/12 months.



git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@806 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
drbob 2008-11-15 20:00:29 +00:00
parent 941e59f6e9
commit 66bf56207d
18 changed files with 768 additions and 111 deletions

View file

@ -47,13 +47,11 @@
#include "pqi/p3connmgr.h"
#include "serialiser/rsconfigitems.h"
#define CONTROL_DEBUG 1
#warning CONFIG_FT_CONTROL Not defined in p3cfgmgr.h
const uint32_t CONFIG_FT_CONTROL = 1;
ftFileControl::ftFileControl()
:mTransfer(NULL), mCreator(NULL),
mState(0), mSize(0), mFlags(0)
@ -75,7 +73,7 @@ ftFileControl::ftFileControl(std::string fname,
}
ftController::ftController(CacheStrapper *cs, ftDataMultiplex *dm, std::string configDir)
:CacheTransfer(cs), p3Config(CONFIG_FT_CONTROL), mDataplex(dm)
:CacheTransfer(cs), p3Config(CONFIG_TYPE_FT_CONTROL), mDataplex(dm)
{
/* TODO */
}
@ -319,6 +317,7 @@ bool ftController::completeFile(std::string hash)
}
IndicateConfigChanged(); /* completed transfer -> save */
return true;
}
@ -367,6 +366,8 @@ bool ftController::FileRequest(std::string fname, std::string hash,
* This is important as some guis request duplicate files regularly.
*/
{ RsStackMutex stack(ctrlMutex); /******* LOCKED ********/
std::map<std::string, ftFileControl>::iterator dit;
dit = mDownloads.find(hash);
if (dit != mDownloads.end())
@ -402,6 +403,8 @@ bool ftController::FileRequest(std::string fname, std::string hash,
(dit->second).mTransfer->addFileSource(*it);
setPeerState(dit->second.mTransfer, *it,
rate, mConnMgr->isOnline(*it));
IndicateConfigChanged(); /* new peer for transfer -> save */
}
if (srcIds.size() == 0)
@ -414,6 +417,7 @@ bool ftController::FileRequest(std::string fname, std::string hash,
return true;
}
} /******* UNLOCKED ********/
bool doCallback = false;
uint32_t callbackCode = 0;
@ -497,14 +501,20 @@ bool ftController::FileRequest(std::string fname, std::string hash,
//std::map<std::string, ftFileCreator *> mFileCreators;
/* add in new item for download */
std::string savepath = mPartialsPath + "/" + hash;
std::string destination = dest + "/" + fname;
std::string savepath;
std::string destination;
{ RsStackMutex stack(ctrlMutex); /******* LOCKED ********/
savepath = mPartialsPath + "/" + hash;
destination = dest + "/" + fname;
/* if no destpath - send to download directory */
if (dest == "")
{
destination = mDownloadPath + "/" + fname;
}
} /******* UNLOCKED ********/
ftFileCreator *fc = new ftFileCreator(savepath, size, hash, 0);
ftTransferModule *tm = new ftTransferModule(fc, mDataplex,this);
@ -536,11 +546,13 @@ bool ftController::FileRequest(std::string fname, std::string hash,
setPeerState(tm, *it, rate, mConnMgr->isOnline(*it));
}
/* only need to lock before to fiddle with own variables */
RsStackMutex stack(ctrlMutex); /******* LOCKED ********/
mDownloads[hash] = ftfc;
mSlowQueue.push_back(hash);
IndicateConfigChanged(); /* completed transfer -> save */
return true;
}
@ -749,6 +761,7 @@ bool ftController::FileDetails(std::string hash, FileInfo &info)
/* extract details */
info.hash = hash;
info.fname = it->second.mName;
info.path = RsDirUtil::removeTopDir(it->second.mDestination); /* remove fname */
/* get list of sources from transferModule */
std::list<std::string> peerIds;
@ -896,24 +909,6 @@ void ftController::statusChange(const std::list<pqipeer> &plist)
}
}
}
/* p3Config Interface */
RsSerialiser *ftController::setupSerialiser()
{
return NULL;
}
std::list<RsItem *> ftController::saveList(bool &cleanup)
{
std::list<RsItem *> emptyList;
return emptyList;
}
bool ftController::loadList(std::list<RsItem *> load)
{
return false;
}
/* Cache Interface */
bool ftController::RequestCacheFile(RsPeerId id, std::string path, std::string hash, uint64_t size)
@ -946,4 +941,202 @@ bool ftController::CancelCacheFile(RsPeerId id, std::string path, std::string ha
return true;
}
const std::string download_dir_ss("DOWN_DIR");
const std::string partial_dir_ss("PART_DIR");
/* p3Config Interface */
RsSerialiser *ftController::setupSerialiser()
{
RsSerialiser *rss = new RsSerialiser();
/* add in the types we need! */
rss->addSerialType(new RsFileConfigSerialiser());
rss->addSerialType(new RsGeneralConfigSerialiser());
return rss;
}
std::list<RsItem *> ftController::saveList(bool &cleanup)
{
std::list<RsItem *> saveData;
/* it can delete them! */
cleanup = true;
/* create a key/value set for most of the parameters */
std::map<std::string, std::string> configMap;
std::map<std::string, std::string>::iterator mit;
std::list<std::string>::iterator it;
/* basic control parameters */
configMap[download_dir_ss] = getDownloadDirectory();
configMap[partial_dir_ss] = getPartialsDirectory();
RsConfigKeyValueSet *rskv = new RsConfigKeyValueSet();
/* Convert to TLV */
for(mit = configMap.begin(); mit != configMap.end(); mit++)
{
RsTlvKeyValue kv;
kv.key = mit->first;
kv.value = mit->second;
rskv->tlvkvs.pairs.push_back(kv);
}
/* Add KeyValue to saveList */
saveData.push_back(rskv);
/* get list of Downloads ....
* strip out Caches / ExtraList / Channels????
* (anything with a callback?)
* - most systems will restart missing files.
*/
/* get Details of File Transfers */
std::list<std::string> hashs;
FileDownloads(hashs);
for(it = hashs.begin(); it != hashs.end(); it++)
{
/* stack mutex released each loop */
RsStackMutex stack(ctrlMutex); /******* LOCKED ********/
std::map<std::string, ftFileControl>::iterator fit;
fit = mDownloads.find(*it);
if (fit == mDownloads.end())
{
continue;
}
/* ignore callback ones */
if (fit->second.mDoCallback)
{
continue;
}
if ((fit->second).mCreator->finished())
{
continue;
}
/* make RsFileTransfer item for save list */
RsFileTransfer *rft = new RsFileTransfer();
/* what data is important? */
rft->file.name = fit->second.mName;
rft->file.hash = fit->second.mHash;
rft->file.filesize = fit->second.mSize;
rft->file.path = RsDirUtil::removeTopDir(fit->second.mDestination); /* remove fname */
//rft->flags = fit->second.mFlags;
fit->second.mTransfer->getFileSources(rft->allPeerIds.ids);
saveData.push_back(rft);
}
/* list completed! */
return saveData;
}
bool ftController::loadList(std::list<RsItem *> load)
{
std::list<RsItem *>::iterator it;
std::list<RsTlvKeyValue>::iterator kit;
RsConfigKeyValueSet *rskv;
RsFileTransfer *rsft;
#ifdef CONTROL_DEBUG
std::cerr << "ftController::loadList() Item Count: " << load.size();
std::cerr << std::endl;
#endif
for(it = load.begin(); it != load.end(); it++)
{
/* switch on type */
if (NULL != (rskv = dynamic_cast<RsConfigKeyValueSet *>(*it)))
{
/* make into map */
std::map<std::string, std::string> configMap;
for(kit = rskv->tlvkvs.pairs.begin();
kit != rskv->tlvkvs.pairs.end(); kit++)
{
configMap[kit->key] = kit->value;
}
loadConfigMap(configMap);
/* cleanup */
delete (*it);
}
else if (NULL != (rsft = dynamic_cast<RsFileTransfer *>(*it)))
{
RsStackMutex stack(ctrlMutex); /******* LOCKED ********/
/* save to the preLoad list */
mResumeTransferList.push_back(rsft);
}
else
{
/* cleanup */
delete (*it);
}
}
return true;
}
bool ftController::loadConfigMap(std::map<std::string, std::string> &configMap)
{
std::map<std::string, std::string>::iterator mit;
std::string str_true("true");
std::string empty("");
std::string dir = "notempty";
if (configMap.end() != (mit = configMap.find(download_dir_ss)))
{
setDownloadDirectory(mit->second);
}
if (configMap.end() != (mit = configMap.find(partial_dir_ss)))
{
//setPartialsDirectory(mit->second);
}
return true;
}
bool ftController::ResumeTransfers()
{
std::list<RsFileTransfer *> resumeList;
std::list<RsFileTransfer *>::iterator it;
{ RsStackMutex stack(ctrlMutex); /******* LOCKED ********/
resumeList = mResumeTransferList;
mResumeTransferList.clear();
}
for(it = resumeList.begin(); it != resumeList.end(); it++)
{
/* do File request */
std::string fname = (*it)->file.name;
std::string hash = (*it)->file.hash;
uint64_t size = (*it)->file.filesize;
std::string dest = (*it)->file.path;
uint32_t flags = 0; //(*it)->flags;
std::list<std::string> srcIds = (*it)->allPeerIds.ids;
FileRequest(fname,hash,size,dest,flags,srcIds);
delete (*it);
}
return true;
}

View file

@ -51,6 +51,7 @@ class ftDataMultiplex;
#include "pqi/p3cfgmgr.h"
#include "rsiface/rsfiles.h"
#include "serialiser/rsconfigitems.h"
#include <map>
@ -94,6 +95,7 @@ class ftController: public CacheTransfer, public RsThread, public pqiMonitor, pu
ftController(CacheStrapper *cs, ftDataMultiplex *dm, std::string configDir);
void setFtSearchNExtra(ftSearch *, ftExtraList *);
bool ResumeTransfers();
virtual void run();
@ -120,6 +122,7 @@ std::string getDownloadDirectory();
std::string getPartialsDirectory();
bool FileDetails(std::string hash, FileInfo &info);
/***************************************************************/
/********************** Cache Transfer *************************/
/***************************************************************/
@ -143,6 +146,8 @@ virtual void statusChange(const std::list<pqipeer> &plist);
virtual RsSerialiser *setupSerialiser();
virtual std::list<RsItem *> saveList(bool &cleanup);
virtual bool loadList(std::list<RsItem *> load);
bool loadConfigMap(std::map<std::string, std::string> &configMap);
private:
@ -179,6 +184,9 @@ bool setPeerState(ftTransferModule *tm, std::string id,
std::list<std::string> mStreamQueue;
std::list<std::string> mFastQueue;
/* Config Load */
std::list<RsFileTransfer *> mResumeTransferList;
/* callback list (for File Completion) */
RsMutex doneMutex;
std::list<std::string> mDone;
@ -186,37 +194,3 @@ bool setPeerState(ftTransferModule *tm, std::string id,
#endif
#if 0
class CacheTransfer
{
public:
CacheTransfer(CacheStrapper *cs) :strapper(cs) { return; }
virtual ~CacheTransfer() {}
/* upload side of things .... searches through CacheStrapper. */
bool FindCacheFile(std::string hash, std::string &path, uint64_t &size);
/* At the download side RequestCache() => overloaded RequestCacheFile()
* the class should then call CompletedCache() or FailedCache()
*/
bool RequestCache(CacheData &data, CacheStore *cbStore); /* request from CacheStore */
protected:
/* to be overloaded */
virtual bool RequestCacheFile(RsPeerId id, std::string path, std::string hash, uint64_t size);
virtual bool CancelCacheFile(RsPeerId id, std::string path, std::string hash, uint64_t size);
bool CompletedCache(std::string hash); /* internal completion -> does cb */
bool FailedCache(std::string hash); /* internal completion -> does cb */
private:
CacheStrapper *strapper;
std::map<std::string, CacheData> cbData;
std::map<std::string, CacheStore *> cbStores;
};
#endif

View file

@ -26,6 +26,8 @@
#include "ft/ftdbase.h"
#include "util/rsdir.h"
#include "serialiser/rsconfigitems.h"
#define DB_DEBUG 1
ftFiStore::ftFiStore(CacheStrapper *cs, CacheTransfer *cft, NotifyBase *cb_in,
@ -118,7 +120,7 @@ bool ftFiStore::search(std::string hash, uint64_t size, uint32_t hintflags, File
ftFiMonitor::ftFiMonitor(CacheStrapper *cs, std::string cachedir, std::string pid)
:FileIndexMonitor(cs, cachedir, pid)
:FileIndexMonitor(cs, cachedir, pid), p3Config(CONFIG_TYPE_FT_SHARED)
{
return;
}
@ -154,6 +156,96 @@ bool ftFiMonitor::search(std::string hash, uint64_t size, uint32_t hintflags, Fi
return false;
};
/******* LOAD / SAVE CONFIG List.
*
*
*
*
*/
RsSerialiser *ftFiMonitor::setupSerialiser()
{
RsSerialiser *rss = new RsSerialiser();
/* add in the types we need! */
rss->addSerialType(new RsFileConfigSerialiser());
return rss;
}
std::list<RsItem *> ftFiMonitor::saveList(bool &cleanup)
{
std::list<RsItem *> sList;
cleanup = true;
#ifdef DB_DEBUG
std::cerr << "ftFiMonitor::saveList()";
std::cerr << std::endl;
#endif
/* get list of directories */
std::list<std::string> dirList;
std::list<std::string>::iterator it;
getSharedDirectories(dirList);
for(it = dirList.begin(); it != dirList.end(); it++)
{
RsFileConfigItem *fi = new RsFileConfigItem();
fi->file.path = *it;
sList.push_back(fi);
}
return sList;
}
bool ftFiMonitor::loadList(std::list<RsItem *> load)
{
/* for each item, check it exists ....
* - remove any that are dead (or flag?)
*/
#ifdef DEBUG_ELIST
std::cerr << "ftFiMonitor::loadList()";
std::cerr << std::endl;
#endif
time_t ts = time(NULL);
std::list<std::string> dirList;
std::list<RsItem *>::iterator it;
for(it = load.begin(); it != load.end(); it++)
{
RsFileConfigItem *fi = dynamic_cast<RsFileConfigItem *>(*it);
if (!fi)
{
delete (*it);
continue;
}
/* ensure that it exists? */
dirList.push_back(fi->file.path);
}
/* set directories */
setSharedDirectories(dirList);
return true;
}
void ftFiMonitor::setSharedDirectories(std::list<std::string> dirList)
{
FileIndexMonitor::setSharedDirectories(dirList);
/* flag for config */
IndicateConfigChanged();
}
ftCacheStrapper::ftCacheStrapper(p3AuthMgr *am, p3ConnectMgr *cm)
:CacheStrapper(am, cm)
{

View file

@ -34,6 +34,7 @@
*/
#include "ft/ftsearch.h"
#include "pqi/p3cfgmgr.h"
#include "dbase/fistore.h"
#include "dbase/fimonitor.h"
@ -50,7 +51,7 @@ class ftFiStore: public FileIndexStore, public ftSearch
virtual bool search(std::string hash, uint64_t size, uint32_t hintflags, FileInfo &info) const;
};
class ftFiMonitor: public FileIndexMonitor, public ftSearch
class ftFiMonitor: public FileIndexMonitor, public ftSearch, public p3Config
{
public:
ftFiMonitor(CacheStrapper *cs, std::string cachedir, std::string pid);
@ -58,6 +59,19 @@ class ftFiMonitor: public FileIndexMonitor, public ftSearch
/* overloaded search function */
virtual bool search(std::string hash, uint64_t size, uint32_t hintflags, FileInfo &info) const;
/* overloaded set dirs enables config indication */
virtual void setSharedDirectories(std::list<std::string> dirList);
/***
* Configuration - store shared directories
*/
protected:
virtual RsSerialiser *setupSerialiser();
virtual std::list<RsItem *> saveList(bool &cleanup);
virtual bool loadList(std::list<RsItem *> load);
};
class ftCacheStrapper: public CacheStrapper, public ftSearch

View file

@ -24,12 +24,13 @@
*/
#include "ft/ftextralist.h"
#include "serialiser/rsconfigitems.h"
#include "util/rsdir.h"
#define DEBUG_ELIST 1
ftExtraList::ftExtraList()
:p3Config(CONFIG_FT_EXTRA_LIST)
:p3Config(CONFIG_TYPE_FT_EXTRA_LIST)
{
return;
}
@ -122,13 +123,13 @@ void ftExtraList::hashAFile()
{
RsStackMutex stack(extMutex);
details.start = time(NULL);
/* stick it in the available queue */
mFiles[details.info.hash] = details;
/* add to the path->hash map */
mHashedList[details.info.path] = details.info.hash;
IndicateConfigChanged();
}
}
@ -157,14 +158,14 @@ bool ftExtraList::addExtraFile(std::string path, std::string hash,
details.info.fname = RsDirUtil::getTopDir(path);
details.info.hash = hash;
details.info.size = size;
details.start = time(NULL);
details.info.age = time(NULL) + period; /* if time > this... cleanup */
details.flags = flags;
details.period = period;
/* stick it in the available queue */
mFiles[details.info.hash] = details;
IndicateConfigChanged();
return true;
}
@ -189,6 +190,8 @@ bool ftExtraList::removeExtraFile(std::string hash, uint32_t flags)
mFiles.erase(it);
IndicateConfigChanged();
return true;
}
@ -212,7 +215,7 @@ bool ftExtraList::cleanupOldFiles()
for(it = mFiles.begin(); it != mFiles.end(); it++)
{
/* check timestamps */
if (it->second.start + it->second.period < (unsigned) now)
if (it->second.info.age < (unsigned) now)
{
toRemove.push_back(it->first);
}
@ -225,14 +228,25 @@ bool ftExtraList::cleanupOldFiles()
{
if (mFiles.end() != (it = mFiles.find(*rit)))
{
cleanupEntry(it->second.info.path, it->second.flags);
mFiles.erase(it);
}
}
IndicateConfigChanged();
}
return true;
}
bool ftExtraList::cleanupEntry(std::string path, uint32_t flags)
{
if (flags & RS_FILE_CONFIG_CLEANUP_DELETE)
{
/* Delete the file? - not yet! */
}
return true;
}
/***
* Hash file, and add to the files,
* file is removed after period.
@ -252,6 +266,7 @@ bool ftExtraList::hashExtraFile(std::string path, uint32_t period, uint32_t fla
RsStackMutex stack(extMutex);
FileDetails details(path, period, flags);
details.info.age = time(NULL) + period;
mToHash.push_back(details);
return true;
@ -291,8 +306,6 @@ bool ftExtraList::search(std::string hash, uint64_t size, uint32_t hintflags,
std::cerr << std::endl;
#endif
RsStackMutex stack(extMutex);
/* find hash */
std::map<std::string, FileDetails>::const_iterator fit;
if (mFiles.end() == (fit = mFiles.find(hash)))
@ -312,17 +325,114 @@ bool ftExtraList::search(std::string hash, uint64_t size, uint32_t hintflags,
RsSerialiser *ftExtraList::setupSerialiser()
{
return NULL;
RsSerialiser *rss = new RsSerialiser();
/* add in the types we need! */
rss->addSerialType(new RsFileConfigSerialiser());
return rss;
}
std::list<RsItem *> ftExtraList::saveList(bool &cleanup)
{
std::list<RsItem *> sList;
cleanup = true;
/* called after each item is added */
/* create a list of fileitems with
* age used to specify its timeout.
*/
#ifdef DEBUG_ELIST
std::cerr << "ftExtraList::saveList()";
std::cerr << std::endl;
#endif
RsStackMutex stack(extMutex);
std::map<std::string, FileDetails>::const_iterator it;
for(it = mFiles.begin(); it != mFiles.end(); it++)
{
RsFileConfigItem *fi = new RsFileConfigItem();
fi->file.path = (it->second).info.path;
fi->file.name = (it->second).info.fname;
fi->file.hash = (it->second).info.hash;
fi->file.filesize = (it->second).info.size;
fi->file.age = (it->second).info.age;
fi->flags = (it->second).flags;
sList.push_back(fi);
}
return sList;
}
bool ftExtraList::loadList(std::list<RsItem *> load)
{
/* for each item, check it exists ....
* - remove any that are dead (or flag?)
*/
#ifdef DEBUG_ELIST
std::cerr << "ftExtraList::loadList()";
std::cerr << std::endl;
#endif
time_t ts = time(NULL);
std::list<RsItem *>::iterator it;
for(it = load.begin(); it != load.end(); it++)
{
RsFileConfigItem *fi = dynamic_cast<RsFileConfigItem *>(*it);
if (!fi)
{
delete (*it);
continue;
}
/* open file */
FILE *fd = fopen(fi->file.path.c_str(), "rb");
if (fd == NULL)
{
delete (*it);
continue;
}
fclose(fd);
if (ts > fi->file.age)
{
/* to old */
cleanupEntry(fi->file.path, fi->flags);
delete (*it);
}
/* add into system */
FileDetails file;
RsStackMutex stack(extMutex);
FileDetails details;
details.info.path = fi->file.path;
details.info.fname = fi->file.name;
details.info.hash = fi->file.hash;
details.info.size = fi->file.filesize;
details.info.age = fi->file.age; /* time that we remove it. */
details.flags = fi->flags;
/* stick it in the available queue */
mFiles[details.info.hash] = details;
delete (*it);
/* short sleep */
usleep(1000); /* 1000 per second */
}
return true;
}

View file

@ -103,9 +103,6 @@ const uint32_t FT_DETAILS_CLEANUP = 0x0100; /* remove when it expires */
const uint32_t FT_DETAILS_LOCAL = 0x0001;
const uint32_t FT_DETAILS_REMOTE = 0x0002;
#warning CONFIG_FT_EXTRA_LIST Not defined in p3cfgmgr.h
const uint32_t CONFIG_FT_EXTRA_LIST = 1;
const uint32_t CLEANUP_PERIOD = 600; /* 10 minutes */
@ -159,6 +156,7 @@ virtual bool loadList(std::list<RsItem *> load);
/* Worker Functions */
void hashAFile();
bool cleanupOldFiles();
bool cleanupEntry(std::string path, uint32_t flags);
mutable RsMutex extMutex;

View file

@ -380,14 +380,41 @@ bool ftServer::removeSharedDirectory(std::string dir)
std::list<std::string> dirList;
std::list<std::string>::iterator it;
#ifdef SERVER_DEBUG
std::cerr << "ftServer::removeSharedDirectory(" << dir << ")";
std::cerr << std::endl;
#endif
mFiMon->getSharedDirectories(dirList);
if (dirList.end() != (it =
#ifdef SERVER_DEBUG
for(it = dirList.begin(); it != dirList.end(); it++)
{
std::cerr << "ftServer::removeSharedDirectory()";
std::cerr << " existing: " << *it;
std::cerr << std::endl;
}
#endif
if (dirList.end() == (it =
std::find(dirList.begin(), dirList.end(), dir)))
{
#ifdef SERVER_DEBUG
std::cerr << "ftServer::removeSharedDirectory()";
std::cerr << " Cannot Find Directory... Fail";
std::cerr << std::endl;
#endif
return false;
}
#ifdef SERVER_DEBUG
std::cerr << "ftServer::removeSharedDirectory()";
std::cerr << " Updating Directories";
std::cerr << std::endl;
#endif
dirList.erase(it);
mFiMon->setSharedDirectories(dirList);
@ -755,4 +782,22 @@ FileInfo(ffr);
**********************************
*********************************/
/***************************** CONFIG ****************************/
bool ftServer::addConfiguration(p3ConfigMgr *cfgmgr)
{
/* add all the subbits to config mgr */
cfgmgr->addConfiguration("ft_shared.cfg", mFiMon);
cfgmgr->addConfiguration("ft_extra.cfg", mFtExtra);
cfgmgr->addConfiguration("ft_transfers.cfg", mFtController);
return true;
}
bool ftServer::ResumeTransfers()
{
mFtController->ResumeTransfers();
return true;
}

View file

@ -179,6 +179,10 @@ virtual bool sendDataRequest(std::string peerId,
/*************** Internal Transfer Fns *************************/
virtual int tick();
/* Configuration */
bool addConfiguration(p3ConfigMgr *cfgmgr);
bool ResumeTransfers();
private:
bool handleInputQueues();
bool handleCacheData();