* More bugfixes for file transfer.

* Added File Transfer / ExtraList to channels.
 * fixed mutex deadlock.
 * added slow transfer for background tf.
 * added checks to FileRequest to accumulate peers.
 * added ExtraList callback.
 * etc, etc.



git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@797 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
drbob 2008-11-09 22:17:20 +00:00
parent 1e33267951
commit aee6cb85b4
14 changed files with 385 additions and 51 deletions

View file

@ -68,13 +68,24 @@ RsChannels *rsChannels = NULL;
#define CHANNEL_STOREPERIOD 10000
#define CHANNEL_PUBPERIOD 600
p3Channels::p3Channels(uint16_t type, CacheStrapper *cs, CacheTransfer *cft,
std::string srcdir, std::string storedir, std::string chanDir)
p3Channels::p3Channels(uint16_t type, CacheStrapper *cs,
CacheTransfer *cft, RsFiles *files,
std::string srcdir, std::string storedir, std::string chanDir)
:p3GroupDistrib(type, cs, cft, srcdir, storedir,
CONFIG_TYPE_CHANNELS, CHANNEL_STOREPERIOD, CHANNEL_PUBPERIOD),
mRsFiles(files),
mChannelsDir(chanDir)
{
//loadDummyData();
/* create chanDir */
if (!RsDirUtil::checkCreateDirectory(mChannelsDir))
{
std::cerr << "p3Channels() Failed to create Channels Directory: ";
std::cerr << mChannelsDir;
std::cerr << std::endl;
}
return;
}
@ -269,6 +280,24 @@ RsDistribGrp *p3Channels::locked_createPrivateDistribGrp(GroupInfo &info)
bool p3Channels::channelSubscribe(std::string cId, bool subscribe)
{
std::cerr << "p3Channels::channelSubscribe() ";
std::cerr << cId;
std::cerr << std::endl;
if (subscribe)
{
std::string channeldir = mChannelsDir + "/" + cId;
/* create chanDir */
if (!RsDirUtil::checkCreateDirectory(channeldir))
{
std::cerr << "p3Channels::channelSubscribe()";
std::cerr << " Failed to create Channels Directory: ";
std::cerr << channeldir;
std::cerr << std::endl;
}
}
return subscribeToGroup(cId, subscribe);
}
@ -285,6 +314,10 @@ bool p3Channels::locked_eventUpdateGroup(GroupInfo *info, bool isNew)
std::string msgId;
std::string nullId;
std::cerr << "p3Channels::locked_eventUpdateGroup() ";
std::cerr << grpId;
std::cerr << std::endl;
if (isNew)
{
getPqiNotify()->AddFeedItem(RS_FEED_ITEM_CHAN_NEW, grpId, msgId, nullId);
@ -294,20 +327,137 @@ bool p3Channels::locked_eventUpdateGroup(GroupInfo *info, bool isNew)
getPqiNotify()->AddFeedItem(RS_FEED_ITEM_CHAN_UPDATE, grpId, msgId, nullId);
}
if (info->flags & RS_DISTRIB_SUBSCRIBED)
{
std::string channeldir = mChannelsDir + "/" + grpId;
std::cerr << "p3Channels::locked_eventUpdateGroup() ";
std::cerr << " creating directory: " << channeldir;
std::cerr << std::endl;
/* create chanDir */
if (!RsDirUtil::checkCreateDirectory(channeldir))
{
std::cerr << "p3Channels::locked_eventUpdateGroup() ";
std::cerr << "Failed to create Channels Directory: ";
std::cerr << channeldir;
std::cerr << std::endl;
}
}
return true;
}
bool p3Channels::locked_eventNewMsg(RsDistribMsg *msg)
/* only download in the first week of channel
* older stuff can be manually downloaded.
*/
const uint32_t DOWNLOAD_PERIOD = 7 * 24 * 3600;
bool p3Channels::locked_eventDuplicateMsg(GroupInfo *grp, RsDistribMsg *msg, std::string id)
{
std::string grpId = msg->grpId;
std::string msgId = msg->msgId;
std::string nullId;
getPqiNotify()->AddFeedItem(RS_FEED_ITEM_CHAN_MSG, grpId, msgId, nullId);
std::cerr << "p3Channels::locked_eventDuplicateMsg() ";
std::cerr << " grpId: " << grpId << " msgId: " << msgId;
std::cerr << " peerId: " << id;
std::cerr << std::endl;
RsChannelMsg *chanMsg = dynamic_cast<RsChannelMsg *>(msg);
if (!chanMsg)
{
return true;
}
/* request the files
* NB: This will result in duplicates.
* it is upto ftserver/ftcontroller/ftextralist
*
* download, then add to
*
* */
//bool download = (grp->flags & (RS_DISTRIB_ADMIN |
// RS_DISTRIB_PUBLISH | RS_DISTRIB_SUBSCRIBED))
bool download = (grp->flags & RS_DISTRIB_SUBSCRIBED);
/* check subscribed */
if (!download)
{
return true;
}
/* check age */
time_t age = time(NULL) - msg->timestamp;
if (age > DOWNLOAD_PERIOD)
{
return true;
}
/* Iterate through files */
std::list<RsTlvFileItem>::iterator fit;
for(fit = chanMsg->attachment.items.begin();
fit != chanMsg->attachment.items.end(); fit++)
{
std::string fname = fit->name;
std::string hash = fit->hash;
uint64_t size = fit->filesize;
std::string channelname = grpId;
std::string localpath = mChannelsDir + "/" + channelname;
uint32_t flags = RS_FILE_HINTS_EXTRA;
std::list<std::string> srcIds;
srcIds.push_back(id);
/* download it ... and flag for ExtraList
* don't do pre-search check as FileRequest does it better
*/
std::cerr << "p3Channels::locked_eventDuplicateMsg() ";
std::cerr << " Downloading: " << fname;
std::cerr << " to: " << localpath;
std::cerr << " from: " << id;
std::cerr << std::endl;
mRsFiles->FileRequest(fname, hash, size,
localpath, flags, srcIds);
}
return true;
}
bool p3Channels::locked_eventNewMsg(GroupInfo *grp, RsDistribMsg *msg, std::string id)
{
std::string grpId = msg->grpId;
std::string msgId = msg->msgId;
std::string nullId;
std::cerr << "p3Channels::locked_eventNewMsg() ";
std::cerr << " grpId: " << grpId;
std::cerr << " msgId: " << msgId;
std::cerr << " peerId: " << id;
std::cerr << std::endl;
getPqiNotify()->AddFeedItem(RS_FEED_ITEM_CHAN_MSG, grpId, msgId, nullId);
/* request the files
* NB: This could result in duplicates.
* which must be handled by ft side.
*
* this is exactly what DuplicateMsg does.
* */
return locked_eventDuplicateMsg(grp, msg, id);
}
/****************************************/

View file

@ -27,6 +27,7 @@
*/
#include "rsiface/rschannels.h"
#include "rsiface/rsfiles.h"
#include "services/p3distrib.h"
#include "serialiser/rstlvtypes.h"
@ -37,7 +38,7 @@ class p3Channels: public p3GroupDistrib, public RsChannels
{
public:
p3Channels(uint16_t type, CacheStrapper *cs, CacheTransfer *cft,
p3Channels(uint16_t type, CacheStrapper *cs, CacheTransfer *cft, RsFiles *files,
std::string srcdir, std::string storedir, std::string channelsdir);
virtual ~p3Channels();
@ -62,7 +63,8 @@ virtual bool channelSubscribe(std::string cId, bool subscribe);
/***************************************************************************************/
virtual bool locked_eventUpdateGroup(GroupInfo *, bool isNew);
virtual bool locked_eventNewMsg(RsDistribMsg *);
virtual bool locked_eventNewMsg(GroupInfo *, RsDistribMsg *, std::string);
virtual bool locked_eventDuplicateMsg(GroupInfo *, RsDistribMsg *, std::string);
/****************************************/
/********* Overloaded Functions *********/
@ -78,6 +80,7 @@ virtual RsDistribGrp *locked_createPrivateDistribGrp(GroupInfo &info);
private:
RsFiles *mRsFiles;
std::string mChannelsDir;
};

View file

@ -561,19 +561,6 @@ void p3GroupDistrib::loadMsg(RsDistribSignedMsg *newMsg, std::string src, bool l
return;
}
/* check for duplicate message */
std::map<std::string, RsDistribMsg *>::iterator mit;
if ((git->second).msgs.end() != (git->second).msgs.find(newMsg->msgId))
{
#ifdef DISTRIB_DEBUG
std::cerr << "p3GroupDistrib::loadMsg() Msg already exists" << std::endl;
std::cerr << std::endl;
#endif
/* if already there -> remove */
delete newMsg;
return;
}
/****************** check the msg ******************/
if (!locked_validateDistribSignedMsg(git->second, newMsg))
{
@ -585,6 +572,26 @@ void p3GroupDistrib::loadMsg(RsDistribSignedMsg *newMsg, std::string src, bool l
return;
}
/* check for duplicate message
*
* do this after validate - because we are calling
* duplicateMsg... only want to do if is good.
*/
std::map<std::string, RsDistribMsg *>::iterator mit;
mit = (git->second).msgs.find(newMsg->msgId);
if (mit != (git->second).msgs.end())
{
#ifdef DISTRIB_DEBUG
std::cerr << "p3GroupDistrib::loadMsg() Msg already exists" << std::endl;
std::cerr << std::endl;
#endif
/* if already there -> remove */
locked_eventDuplicateMsg(&(git->second), mit->second, src);
delete newMsg;
return;
}
/* convert Msg */
RsDistribMsg *msg = unpackDistribSignedMsg(newMsg);
if (!msg)
@ -617,7 +624,7 @@ void p3GroupDistrib::loadMsg(RsDistribSignedMsg *newMsg, std::string src, bool l
#endif
/* Callback for any derived classes to play with */
locked_eventNewMsg(msg);
locked_eventNewMsg(&(git->second), msg, src);
/* else if group = subscribed | listener -> publish */
/* if it has come from us... then it has been published already */

View file

@ -280,7 +280,8 @@ RsDistribMsg *locked_getGroupMsg(std::string grpId, std::string msgId);
/***************************************************************************************/
virtual bool locked_eventUpdateGroup(GroupInfo *, bool isNew) = 0;
virtual bool locked_eventNewMsg(RsDistribMsg *) = 0;
virtual bool locked_eventDuplicateMsg(GroupInfo *, RsDistribMsg *, std::string id) = 0;
virtual bool locked_eventNewMsg(GroupInfo *, RsDistribMsg *, std::string id) = 0;
/***************************************************************************************/
/********************************* p3Config ********************************************/

View file

@ -376,8 +376,12 @@ bool p3Forums::locked_eventUpdateGroup(GroupInfo *info, bool isNew)
return true;
}
bool p3Forums::locked_eventDuplicateMsg(GroupInfo *grp, RsDistribMsg *msg, std::string id)
{
return true;
}
bool p3Forums::locked_eventNewMsg(RsDistribMsg *msg)
bool p3Forums::locked_eventNewMsg(GroupInfo *grp, RsDistribMsg *msg, std::string id)
{
std::string grpId = msg->grpId;
std::string msgId = msg->msgId;

View file

@ -100,7 +100,9 @@ virtual bool forumSubscribe(std::string fId, bool subscribe);
/***************************************************************************************/
virtual bool locked_eventUpdateGroup(GroupInfo *, bool isNew);
virtual bool locked_eventNewMsg(RsDistribMsg *);
virtual bool locked_eventDuplicateMsg(GroupInfo *, RsDistribMsg *, std::string);
virtual bool locked_eventNewMsg(GroupInfo *, RsDistribMsg *, std::string);
/****************************************/