added some minor feature to channels

- clean up of channels old (> storeperiod) cache file downloads called at rsglobal shutdown

- added extrafilehash and extra-remove to rschannels, so it chooses appropriate way to add files (correct book-keeping)
   - added extra files not copied to chan dir if over 100mb (high overhead)
- added limit to channels auto-download (1gig)




git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@2866 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
chrisparker126 2010-05-08 16:10:34 +00:00
parent 03d0058ca4
commit e1c4992680
6 changed files with 241 additions and 19 deletions

View File

@ -112,6 +112,19 @@ virtual bool getChannelMessage(std::string cId, std::string mId, ChannelMsgInfo
virtual bool ChannelMessageSend(ChannelMsgInfo &info) = 0;
virtual bool channelSubscribe(std::string cId, bool subscribe) = 0;
/*!
* This hashes a file which is not already shared by client or his peers,
* The file is copied into the channels directory if its not too large (> 100mb)
* @param path This is full path to file
* @param channel Id
*/
virtual bool channelExtraFileHash(std::string path, std::string chId, FileInfo& fInfo) = 0;
/*!
* This removes hashed extra files, and also removes channels directory copy if it exists
*/
virtual bool channelExtraFileRemove(std::string hash, std::string chId) = 0;
/****************************************/
};

View File

@ -166,8 +166,8 @@ void RsServer::ConfigFinalSave()
void RsServer::rsGlobalShutDown()
{
// TODO: cache should also clean up old files
mChannels->cleanUpOldFiles();
ConfigFinalSave(); // save configuration before exit
mConnMgr->shutdown(); /* Handles UPnP */
}

View File

@ -45,6 +45,7 @@
#include "services/p3Qblog.h"
#include "services/p3blogs.h"
#include "services/p3statusservice.h"
#include "services/p3channels.h"
/* The Main Interface Class - for controlling the server */
@ -174,7 +175,7 @@ class RsServer: public RsControl, public RsThread
p3MsgService *msgSrv;
p3ChatService *chatSrv;
p3StatusService *mStatusSrv;
p3Channels *mChannels;
/* caches (that need ticking) */
p3Ranking *mRanking;
p3Qblog *mQblog;

View File

@ -1104,7 +1104,8 @@ bool RsInit::LoadPassword(std::string id, std::string inPwd)
RsInitConfig::configDir = RsInitConfig::basedir + RsInitConfig::dirSeperator + id;
RsInitConfig::passwd = inPwd;
RsInitConfig::havePasswd = true;
if(inPwd != "")
RsInitConfig::havePasswd = true;
// Create the filename.
std::string basename = RsInitConfig::configDir + RsInitConfig::dirSeperator;
@ -2062,7 +2063,7 @@ int RsServer::StartupRetroShare()
mCacheStrapper -> addCachePair(cp4);
pqih -> addService(mForums); /* This must be also ticked as a service */
p3Channels *mChannels = new p3Channels(RS_SERVICE_TYPE_CHANNEL,
mChannels = new p3Channels(RS_SERVICE_TYPE_CHANNEL,
mCacheStrapper, mCacheTransfer, rsFiles,
localcachedir, remotecachedir, channelsdir);
@ -2213,7 +2214,6 @@ int RsServer::StartupRetroShare()
/**************************************************************************/
/* load caches and secondary data */
/**************************************************************************/

View File

@ -61,11 +61,12 @@ std::ostream &operator<<(std::ostream &out, const ChannelMsgInfo &info)
RsChannels *rsChannels = NULL;
/* Channels will be initially stored for 1 year
* remember 2^16 = 64K max units in store period.
/* remember 2^16 = 64K max units in store period.
* PUBPERIOD * 2^16 = max STORE PERIOD */
#define CHANNEL_STOREPERIOD (90*24*3600) /* 30 * 24 * 3600 - secs in a year */
#define CHANNEL_PUBPERIOD 600 /* 10 minutes ... (max = 455 days) */
#define CHANNEL_STOREPERIOD (30*24*3600) /* 30 * 24 * 3600 - secs in a 30 day month */
#define TEST_CHANNEL_STOREPERIOD (24*3600) /* one day */
#define CHANNEL_PUBPERIOD 6 /* 10 minutes ... (max = 455 days) */
#define MAX_AUTO_DL 1E9 /* auto download of attachment limit; 1 GIG */
p3Channels::p3Channels(uint16_t type, CacheStrapper *cs,
CacheTransfer *cft, RsFiles *files,
@ -82,6 +83,7 @@ p3Channels::p3Channels(uint16_t type, CacheStrapper *cs,
std::cerr << "p3Channels() Failed to create Channels Directory: " << mChannelsDir << std::endl;
}
return;
}
@ -227,6 +229,7 @@ bool p3Channels::ChannelMessageSend(ChannelMsgInfo &info)
std::list<FileInfo>::iterator it;
for(it = info.files.begin(); it != info.files.end(); it++)
{
RsTlvFileItem mfi;
mfi.hash = it -> hash;
mfi.name = it -> fname;
@ -241,6 +244,134 @@ bool p3Channels::ChannelMessageSend(ChannelMsgInfo &info)
}
bool p3Channels::channelExtraFileHash(std::string path, std::string chId, FileInfo& fInfo){
// get file name
std::string fname, fnameBuff;
std::string::reverse_iterator rit;
for(rit = path.rbegin(); *rit != '/' ; rit++){
fnameBuff.push_back(*rit);
}
// reverse string buff for correct file name
fname.append(fnameBuff.rbegin(), fnameBuff.rend());
bool fileTooLarge = false;
// first copy file into channel directory
if(!cpyMsgFileToChFldr(path, fname, chId, fileTooLarge)){
if(!fileTooLarge)
return false;
}
uint32_t flags = RS_FILE_HINTS_NETWORK_WIDE | RS_FILE_HINTS_EXTRA;
// then hash file, but hash file at original location if its too large
// and get file info too
if(fileTooLarge){
if(!mRsFiles->ExtraFileHash(path, CHANNEL_STOREPERIOD, flags))
return false;
fInfo.path = path;
}else{
std::string localpath = mChannelsDir + "/" + chId + "/" + fname;
if(!mRsFiles->ExtraFileHash(localpath, CHANNEL_STOREPERIOD, flags))
return false;
fInfo.path = localpath;
}
fInfo.fname = fname;
return true;
}
bool p3Channels::cpyMsgFileToChFldr(std::string path, std::string fname, std::string chId, bool& fileTooLarge){
FILE *outFile = NULL, *inFile = fopen(path.c_str(), "rb");
long buffSize = 0;
char* buffer = NULL;
if(inFile){
// obtain file size:
fseek (inFile , 0 , SEEK_END);
buffSize = ftell (inFile);
rewind (inFile);
// don't copy if file over 100mb
if(buffSize > (MAX_AUTO_DL / 10) ){
fileTooLarge = true;
fclose(inFile);
return false;
}
// allocate memory to contain the whole file:
buffer = (char*) malloc (sizeof(char)*buffSize);
if(!buffer){
fclose(inFile);
return false;
}
fread (buffer,1,buffSize,inFile);
fclose(inFile);
std::string localpath = mChannelsDir + "/" + chId + "/" + fname;
outFile = fopen(localpath.c_str(), "wb");
}
if(outFile){
fwrite(buffer, 1, buffSize, outFile);
fclose(outFile);
}else{
std::cerr << "p3Channels::cpyMsgFiletoFldr(): Failed to copy Channel Msg file to its channel folder"
<< std::endl;
if((buffSize > 0) && (buffer != NULL))
free(buffer);
return false;
}
if((buffSize > 0) && (buffer != NULL))
free(buffer);
return true;
}
bool p3Channels::channelExtraFileRemove(std::string hash, std::string chId){
uint32_t flags = RS_FILE_HINTS_NETWORK_WIDE | RS_FILE_HINTS_EXTRA;
/* remove copy from channels directory */
FileInfo fInfo;
mRsFiles->FileDetails(hash, flags, fInfo);
std::string chPath = mChannelsDir + "/" + chId + "/" + fInfo.fname;
if(remove(chPath.c_str()) == 0){
std::cerr << "p3Channel::channelExtraFileRemove() Removed file :"
<< chPath.c_str() << std::endl;
}else{
std::cerr << "p3Channel::channelExtraFileRemove() Failed to remove file :"
<< chPath.c_str() << std::endl;
}
return mRsFiles->ExtraFileRemove(hash, flags);
}
std::string p3Channels::createChannel(std::wstring channelName, std::wstring channelDesc, uint32_t channelFlags)
{
std::string id = createGroup(channelName, channelDesc, channelFlags);
@ -321,9 +452,7 @@ bool p3Channels::locked_eventDuplicateMsg(GroupInfo *grp, RsDistribMsg *msg, std
* it is upto ftserver/ftcontroller/ftextralist
* */
//bool download = (grp->flags & (RS_DISTRIB_ADMIN |
// RS_DISTRIB_PUBLISH | RS_DISTRIB_SUBSCRIBED))
bool download = (grp->flags & RS_DISTRIB_SUBSCRIBED);
bool download = (grp->flags & RS_DISTRIB_SUBSCRIBED);
/* check subscribed */
if (!download)
@ -334,7 +463,7 @@ bool p3Channels::locked_eventDuplicateMsg(GroupInfo *grp, RsDistribMsg *msg, std
/* check age */
time_t age = time(NULL) - msg->timestamp;
if (age > (time_t)DOWNLOAD_PERIOD)
if (age > (time_t)DOWNLOAD_PERIOD )
{
return true;
}
@ -365,8 +494,9 @@ bool p3Channels::locked_eventDuplicateMsg(GroupInfo *grp, RsDistribMsg *msg, std
std::cerr << " to: " << localpath << " from: " << id << std::endl;
#endif
mRsFiles->FileRequest(fname, hash, size,
localpath, flags, srcIds);
if(size < MAX_AUTO_DL)
mRsFiles->FileRequest(fname, hash, size,
localpath, flags, srcIds);
}
@ -399,8 +529,6 @@ bool p3Channels::locked_eventNewMsg(GroupInfo *grp, RsDistribMsg *msg, std::stri
}
void p3Channels::locked_notifyGroupChanged(GroupInfo &grp, uint32_t flags)
{
std::string grpId = grp.grpId;
@ -472,7 +600,72 @@ void p3Channels::locked_notifyGroupChanged(GroupInfo &grp, uint32_t flags)
return p3GroupDistrib::locked_notifyGroupChanged(grp, flags);
}
void p3Channels::cleanUpOldFiles(){
time_t now = time(NULL);
std::list<ChannelInfo> chList;
std::list<ChannelInfo>::iterator ch_it;
// first get channel list
if(!getChannelList(chList))
return;
std::list<ChannelMsgSummary> msgList;
std::list<ChannelMsgSummary>::iterator msg_it;
// then msg for each channel
for(ch_it = chList.begin(); ch_it != chList.end(); ch_it++){
if(!getChannelMsgList(ch_it->channelId, msgList))
continue;
std::string channelname = ch_it->channelId;
std::string localpath = mChannelsDir + "/" + channelname;
for(msg_it = msgList.begin(); msg_it != msgList.end(); msg_it++){
ChannelMsgInfo chMsgInfo;
if(!getChannelMessage(ch_it->channelId, msg_it->msgId, chMsgInfo))
continue;
// if msg not old, leave it alone
if( chMsgInfo.ts > (now - CHANNEL_STOREPERIOD))
continue;
std::list<FileInfo>::iterator file_it;
// get the files
for(file_it = chMsgInfo.files.begin(); file_it != chMsgInfo.files.end(); file_it++){
std::string msgFile = localpath + "/" + file_it->fname;
if(mRsFiles){
if(mRsFiles->ExtraFileRemove(file_it->hash, ~(RS_FILE_HINTS_DOWNLOAD | RS_FILE_HINTS_UPLOAD)))
std::cerr << "p3Channels::clearOldFIles() failed to remove files from extras" << std::endl;
if(remove(msgFile.c_str()) == 0){
std::cerr << "p3Channels::clearOldFiles() file removed: "
<< msgFile << std::endl;
}else{
std::cerr << "p3Channels::clearOldFiles() failed to remove file: "
<< msgFile << std::endl;
}
}else{
std::cerr << "p3Channels::cleanUpOldFiles() : Pointer passed to (this) Invalid" << std::endl;
}
}
}
}
// clean up local caches
clear_local_caches(now); //how about remote caches, remember its hardwired
return;
}
/****************************************/

View File

@ -33,7 +33,12 @@
#include "serialiser/rstlvtypes.h"
#include "serialiser/rschannelitems.h"
//! Channels is a distributed 'feed' service
/*!
* The channels service allows peers to subscirbe to each feeds published by
* their peers. Using the p3groupdistrib class to enable management of
* publication and viewing keys
*/
class p3Channels: public p3GroupDistrib, public RsChannels
{
public:
@ -42,6 +47,12 @@ class p3Channels: public p3GroupDistrib, public RsChannels
std::string srcdir, std::string storedir, std::string channelsdir);
virtual ~p3Channels();
/*!
* cleans up local info and dowloaded files older than one month,
* should be called during shutdown of rs
*/
void cleanUpOldFiles();
/****************************************/
/********* rsChannels Interface ***********/
@ -57,6 +68,8 @@ virtual bool getChannelMessage(std::string cId, std::string mId, ChannelMsgInfo
virtual bool ChannelMessageSend(ChannelMsgInfo &info);
virtual bool channelSubscribe(std::string cId, bool subscribe);
virtual bool channelExtraFileHash(std::string path, std::string chId, FileInfo& fInfo);
virtual bool channelExtraFileRemove(std::string hash, std::string chId);
/***************************************************************************************/
/****************** Event Feedback (Overloaded form p3distrib) *************************/
@ -82,6 +95,8 @@ virtual RsDistribGrp *locked_createPrivateDistribGrp(GroupInfo &info);
private:
bool cpyMsgFileToChFldr(std::string path, std::string fname, std::string chId, bool& fileTooLarge);
RsFiles *mRsFiles;
std::string mChannelsDir;