diff --git a/libretroshare/src/ft/ftcontroller.cc b/libretroshare/src/ft/ftcontroller.cc index 1d6349734..e0ea89c26 100644 --- a/libretroshare/src/ft/ftcontroller.cc +++ b/libretroshare/src/ft/ftcontroller.cc @@ -73,7 +73,7 @@ ftFileControl::ftFileControl(std::string fname, } ftController::ftController(CacheStrapper *cs, ftDataMultiplex *dm, std::string configDir) - :CacheTransfer(cs), p3Config(CONFIG_TYPE_FT_CONTROL), mDataplex(dm) + :CacheTransfer(cs), p3Config(CONFIG_TYPE_FT_CONTROL), mDataplex(dm), mFtActive(false) { /* TODO */ } @@ -100,6 +100,20 @@ void ftController::run() //std::cerr << "ftController::run()"; //std::cerr << std::endl; #endif + bool doPending = false; + { + RsStackMutex stack(ctrlMutex); /******* LOCKED ********/ + doPending = (mFtActive) && (!mFtPendingDone); + } + + if (doPending) + { + if (!handleAPendingRequest()) + { + RsStackMutex stack(ctrlMutex); /******* LOCKED ********/ + mFtPendingDone = true; + } + } /* tick the transferModules */ std::list done; @@ -124,6 +138,7 @@ void ftController::run() completeFile(*it); } mDone.clear(); + } } @@ -329,10 +344,51 @@ bool ftController::completeFile(std::string hash) const uint32_t FT_CNTRL_STANDARD_RATE = 1024 * 1024; const uint32_t FT_CNTRL_SLOW_RATE = 10 * 1024; +bool ftController::activate() +{ + RsStackMutex stack(ctrlMutex); /******* LOCKED ********/ + mFtActive = true; + mFtPendingDone = false; + return true; +} + +bool ftController::handleAPendingRequest() +{ + ftPendingRequest req; + { RsStackMutex stack(ctrlMutex); /******* LOCKED ********/ + + if (mPendingRequests.size() < 1) + { + return false; + } + req = mPendingRequests.front(); + mPendingRequests.pop_front(); + } + FileRequest(req.mName, req.mHash, req.mSize, req.mDest, req.mFlags, req.mSrcIds); + return true; +} + + bool ftController::FileRequest(std::string fname, std::string hash, uint64_t size, std::string dest, uint32_t flags, std::list &srcIds) { + /* If file transfer is not enabled .... + * save request for later. This will also + * mean that we will have to copy local files, + * or have a callback which says: local file. + */ + + { RsStackMutex stack(ctrlMutex); /******* LOCKED ********/ + if (!mFtActive) + { + /* store in pending queue */ + ftPendingRequest req(fname, hash, size, dest, flags, srcIds); + mPendingRequests.push_back(req); + return true; + } + } + /* check if we have the file */ FileInfo info; std::list::iterator it; @@ -1070,22 +1126,22 @@ bool ftController::loadList(std::list load) } loadConfigMap(configMap); - /* cleanup */ - delete (*it); } else if (NULL != (rsft = dynamic_cast(*it))) { RsStackMutex stack(ctrlMutex); /******* LOCKED ********/ - /* save to the preLoad list */ - mResumeTransferList.push_back(rsft); - } - else - { - /* cleanup */ - delete (*it); + /* This will get stored on a waiting list - until the + * config files are fully loaded + */ + FileRequest(rsft->file.name, rsft->file.hash, rsft->file.filesize, + rsft->file.path, 0, rsft->allPeerIds.ids); + } + + /* cleanup */ + delete (*it); } return true; @@ -1112,31 +1168,3 @@ bool ftController::loadConfigMap(std::map &configMap) return true; } - -bool ftController::ResumeTransfers() -{ - std::list resumeList; - std::list::iterator it; - - { RsStackMutex stack(ctrlMutex); /******* LOCKED ********/ - resumeList = mResumeTransferList; - mResumeTransferList.clear(); - } - - for(it = resumeList.begin(); it != resumeList.end(); it++) - { - /* do File request */ - std::string fname = (*it)->file.name; - std::string hash = (*it)->file.hash; - uint64_t size = (*it)->file.filesize; - std::string dest = (*it)->file.path; - uint32_t flags = 0; //(*it)->flags; - std::list srcIds = (*it)->allPeerIds.ids; - - FileRequest(fname,hash,size,dest,flags,srcIds); - - delete (*it); - } - return true; -} - diff --git a/libretroshare/src/ft/ftcontroller.h b/libretroshare/src/ft/ftcontroller.h index 6e87e7728..9350805b5 100644 --- a/libretroshare/src/ft/ftcontroller.h +++ b/libretroshare/src/ft/ftcontroller.h @@ -86,6 +86,25 @@ class ftFileControl uint32_t mCallbackCode; }; +class ftPendingRequest +{ + public: + ftPendingRequest(std::string fname, std::string hash, + uint64_t size, std::string dest, uint32_t flags, + std::list &srcIds) + : mName(fname), mHash(hash), mSize(size), + mDest(dest), mFlags(flags),mSrcIds(srcIds) { return; } + + ftPendingRequest() : mSize(0), mFlags(0) { return; } + + std::string mName; + std::string mHash; + uint64_t mSize; + std::string mDest; + uint32_t mFlags; + std::list mSrcIds; +}; + class ftController: public CacheTransfer, public RsThread, public pqiMonitor, public p3Config { @@ -95,7 +114,7 @@ class ftController: public CacheTransfer, public RsThread, public pqiMonitor, pu ftController(CacheStrapper *cs, ftDataMultiplex *dm, std::string configDir); void setFtSearchNExtra(ftSearch *, ftExtraList *); -bool ResumeTransfers(); +bool activate(); virtual void run(); @@ -154,6 +173,7 @@ bool loadConfigMap(std::map &configMap); /* RunTime Functions */ void checkDownloadQueue(); bool completeFile(std::string hash); +bool handleAPendingRequest(); bool setPeerState(ftTransferModule *tm, std::string id, uint32_t maxrate, bool online); @@ -184,13 +204,14 @@ bool setPeerState(ftTransferModule *tm, std::string id, std::list mStreamQueue; std::list mFastQueue; - /* Config Load */ - std::list mResumeTransferList; - /* callback list (for File Completion) */ RsMutex doneMutex; std::list mDone; + + /* List to Pause File transfers until Caches are properly loaded */ + bool mFtActive; + bool mFtPendingDone; + std::list mPendingRequests; }; #endif - diff --git a/libretroshare/src/ft/ftfilecreator.cc b/libretroshare/src/ft/ftfilecreator.cc index bc232ac4a..4f9b53dbd 100644 --- a/libretroshare/src/ft/ftfilecreator.cc +++ b/libretroshare/src/ft/ftfilecreator.cc @@ -201,6 +201,10 @@ int ftFileCreator::initializeFileAttrs() uint64_t recvdsize = ftell(fd); std::cerr << "ftFileCreator::initializeFileAttrs() File Expected Size: " << mSize << " RecvdSize: " << recvdsize << std::endl; + + /* start from there! */ + mStart = recvdsize; + mEnd = recvdsize; return 1; } diff --git a/libretroshare/src/ft/ftserver.cc b/libretroshare/src/ft/ftserver.cc index f9f631eb8..abd35c9da 100644 --- a/libretroshare/src/ft/ftserver.cc +++ b/libretroshare/src/ft/ftserver.cc @@ -794,7 +794,7 @@ bool ftServer::addConfiguration(p3ConfigMgr *cfgmgr) bool ftServer::ResumeTransfers() { - mFtController->ResumeTransfers(); + mFtController->activate(); return true; } diff --git a/libretroshare/src/ft/fttransfermodule.cc b/libretroshare/src/ft/fttransfermodule.cc index 3916ef933..5e81fb8be 100644 --- a/libretroshare/src/ft/fttransfermodule.cc +++ b/libretroshare/src/ft/fttransfermodule.cc @@ -148,7 +148,7 @@ bool ftTransferModule::addFileSource(std::string peerId) std::cerr << std::endl; #endif } - + return true; } @@ -542,7 +542,7 @@ bool ftTransferModule::locked_tickPeerTransfer(peerInfo &info) return false; } - if (ageReq > FT_TM_RESTART_DOWNLOAD * (info.nResets + 1)) + if (ageReq > (int) (FT_TM_RESTART_DOWNLOAD * (info.nResets + 1))) { if (info.nResets > 1) /* 3rd timeout */ { @@ -569,7 +569,7 @@ bool ftTransferModule::locked_tickPeerTransfer(peerInfo &info) } } - if (ageRecv > FT_TM_DOWNLOAD_TIMEOUT) + if (ageRecv > (int) FT_TM_DOWNLOAD_TIMEOUT) { info.state = PQIPEER_IDLE; return false; diff --git a/libretroshare/src/scripts/makeMacUniversalLibrary.sh b/libretroshare/src/scripts/makeMacUniversalLibrary.sh index 57c8b9455..ee6d3c59d 100755 --- a/libretroshare/src/scripts/makeMacUniversalLibrary.sh +++ b/libretroshare/src/scripts/makeMacUniversalLibrary.sh @@ -13,14 +13,14 @@ LIB_X86=libretroshare_x86.a MAC_SCRIPT="./scripts/config-macosx.mk" -echo cp lib/$LIB lib/$LIB_PPC -cp lib/$LIB lib/$LIB_PPC +echo cp lib/$LIB lib/$LIB_PPC librs +cp lib/$LIB lib/$LIB_PPC librs echo make clobber make clobber -echo make "MAC_I386_BUILD=1" -make "MAC_I386_BUILD=1" +echo make "MAC_I386_BUILD=1" librs +make "MAC_I386_BUILD=1" librs echo cp lib/$LIB lib/$LIB_X86 cp lib/$LIB lib/$LIB_X86 diff --git a/libretroshare/src/services/p3channels.cc b/libretroshare/src/services/p3channels.cc index bf67503f2..4fa6d9bde 100644 --- a/libretroshare/src/services/p3channels.cc +++ b/libretroshare/src/services/p3channels.cc @@ -294,58 +294,16 @@ bool p3Channels::channelSubscribe(std::string cId, bool subscribe) /***************************************************************************************/ /****************** Event Feedback (Overloaded form p3distrib) *************************/ /***************************************************************************************/ - -#include "pqi/pqinotify.h" - -bool p3Channels::locked_eventUpdateGroup(GroupInfo *info, bool isNew) -{ - std::string grpId = info->grpId; - std::string msgId; - std::string nullId; - - std::cerr << "p3Channels::locked_eventUpdateGroup() "; - std::cerr << grpId; - std::cerr << " flags:" << info->flags; - std::cerr << std::endl; - - if (isNew) - { - getPqiNotify()->AddFeedItem(RS_FEED_ITEM_CHAN_NEW, grpId, msgId, nullId); - } - else - { - getPqiNotify()->AddFeedItem(RS_FEED_ITEM_CHAN_UPDATE, grpId, msgId, nullId); - } - - if (info->flags & RS_DISTRIB_SUBSCRIBED) - // || (info->flags & RS_DISTRIB_SUBSCRIBED)) - { - std::string channeldir = mChannelsDir + "/" + grpId; - - std::cerr << "p3Channels::locked_eventUpdateGroup() "; - std::cerr << " creating directory: " << channeldir; - std::cerr << std::endl; - - /* create chanDir */ - if (!RsDirUtil::checkCreateDirectory(channeldir)) - { - std::cerr << "p3Channels::locked_eventUpdateGroup() "; - std::cerr << "Failed to create Channels Directory: "; - std::cerr << channeldir; - std::cerr << std::endl; - } - } - - - return true; -} - /* only download in the first week of channel * older stuff can be manually downloaded. */ const uint32_t DOWNLOAD_PERIOD = 7 * 24 * 3600; +/* This is called when we receive a msg, and also recalled + * on a subscription to a channel.. + */ + bool p3Channels::locked_eventDuplicateMsg(GroupInfo *grp, RsDistribMsg *msg, std::string id) { std::string grpId = msg->grpId; @@ -368,9 +326,6 @@ bool p3Channels::locked_eventDuplicateMsg(GroupInfo *grp, RsDistribMsg *msg, std /* request the files * NB: This will result in duplicates. * it is upto ftserver/ftcontroller/ftextralist - * - * download, then add to - * * */ //bool download = (grp->flags & (RS_DISTRIB_ADMIN | @@ -408,6 +363,8 @@ bool p3Channels::locked_eventDuplicateMsg(GroupInfo *grp, RsDistribMsg *msg, std /* download it ... and flag for ExtraList * don't do pre-search check as FileRequest does it better + * + * FileRequest will ignore request if file is already indexed. */ std::cerr << "p3Channels::locked_eventDuplicateMsg() "; @@ -424,6 +381,7 @@ bool p3Channels::locked_eventDuplicateMsg(GroupInfo *grp, RsDistribMsg *msg, std return true; } +#include "pqi/pqinotify.h" bool p3Channels::locked_eventNewMsg(GroupInfo *grp, RsDistribMsg *msg, std::string id) { @@ -449,31 +407,62 @@ bool p3Channels::locked_eventNewMsg(GroupInfo *grp, RsDistribMsg *msg, std::stri } -void p3Channels::locked_notifyGroupChanged(GroupInfo &grp) + + +void p3Channels::locked_notifyGroupChanged(GroupInfo &grp, uint32_t flags) { - /* create directory if needed */ - if (grp.flags & RS_DISTRIB_SUBSCRIBED) + std::string grpId = grp.grpId; + std::string msgId; + std::string nullId; + + std::cerr << "p3Channels::locked_eventUpdateGroup() "; + std::cerr << grpId; + std::cerr << " flags:" << grp.flags; + std::cerr << std::endl; + + switch(flags) { - std::string channeldir = mChannelsDir + "/" + grp.grpId; + case GRP_NEW_UPDATE: + getPqiNotify()->AddFeedItem(RS_FEED_ITEM_CHAN_NEW, grpId, msgId, nullId); + break; + case GRP_UPDATE: + getPqiNotify()->AddFeedItem(RS_FEED_ITEM_CHAN_UPDATE, grpId, msgId, nullId); + break; + case GRP_LOAD_KEY: + break; + case GRP_NEW_MSG: + break; + case GRP_SUBSCRIBED: + break; + + { + std::string channeldir = mChannelsDir + "/" + grpId; + + std::cerr << "p3Channels::locked_notifyGroupChanged() "; + std::cerr << " creating directory: " << channeldir; + std::cerr << std::endl; /* create chanDir */ if (!RsDirUtil::checkCreateDirectory(channeldir)) { - std::cerr << "p3Channels::channelSubscribe()"; - std::cerr << " Failed to create Channels Directory: "; - std::cerr << channeldir; - std::cerr << std::endl; - } - else - { - std::cerr << "p3Channels::channelSubscribe()"; - std::cerr << " Created: "; + std::cerr << "p3Channels::locked_notifyGroupChanged() "; + std::cerr << "Failed to create Channels Directory: "; std::cerr << channeldir; std::cerr << std::endl; } + + /* check if downloads need to be started? */ } - return p3GroupDistrib::locked_notifyGroupChanged(grp); + break; + case GRP_UNSUBSCRIBED: + + /* won't stop downloads... */ + + break; + } + + return p3GroupDistrib::locked_notifyGroupChanged(grp, flags); } diff --git a/libretroshare/src/services/p3channels.h b/libretroshare/src/services/p3channels.h index cf103b70d..c396edeff 100644 --- a/libretroshare/src/services/p3channels.h +++ b/libretroshare/src/services/p3channels.h @@ -63,7 +63,7 @@ virtual bool channelSubscribe(std::string cId, bool subscribe); /***************************************************************************************/ protected: -virtual bool locked_eventUpdateGroup(GroupInfo *, bool isNew); +virtual void locked_notifyGroupChanged(GroupInfo &info, uint32_t flags); virtual bool locked_eventNewMsg(GroupInfo *, RsDistribMsg *, std::string); virtual bool locked_eventDuplicateMsg(GroupInfo *, RsDistribMsg *, std::string); @@ -77,7 +77,6 @@ virtual bool locked_checkDistribMsg(RsDistribMsg *msg); virtual RsDistribGrp *locked_createPublicDistribGrp(GroupInfo &info); virtual RsDistribGrp *locked_createPrivateDistribGrp(GroupInfo &info); -virtual void locked_notifyGroupChanged(GroupInfo &info); /****************************************/ diff --git a/libretroshare/src/services/p3distrib.cc b/libretroshare/src/services/p3distrib.cc index cf0e656af..5935729fb 100644 --- a/libretroshare/src/services/p3distrib.cc +++ b/libretroshare/src/services/p3distrib.cc @@ -434,9 +434,11 @@ void p3GroupDistrib::loadGroup(RsDistribGrp *newGrp) else { /* Callback for any derived classes */ - locked_eventUpdateGroup(&(it->second), isNew); - locked_notifyGroupChanged(it->second); + if (isNew) + locked_notifyGroupChanged(it->second, GRP_NEW_UPDATE); + else + locked_notifyGroupChanged(it->second, GRP_UPDATE); } #ifdef DISTRIB_DEBUG @@ -511,7 +513,7 @@ void p3GroupDistrib::loadGroupKey(RsDistribGrpKey *newKey) if (updateOk) { - locked_notifyGroupChanged(it->second); + locked_notifyGroupChanged(it->second, GRP_LOAD_KEY); } #ifdef DISTRIB_DEBUG @@ -638,14 +640,18 @@ void p3GroupDistrib::loadMsg(RsDistribSignedMsg *newMsg, std::string src, bool l } else { + /* Note it makes it very difficult to republish msg - if we have + * deleted the signed version... The load of old messages will occur + * at next startup. And publication will happen then too. + */ + #ifdef DISTRIB_DEBUG std::cerr << "p3GroupDistrib::loadMsg() Deleted Original Msg (No Publish)"; std::cerr << std::endl; #endif delete newMsg; } - - locked_notifyGroupChanged(git->second); + locked_notifyGroupChanged(git->second, GRP_NEW_MSG); } @@ -1112,8 +1118,28 @@ bool p3GroupDistrib::subscribeToGroup(std::string grpId, bool subscribe) { git->second.flags |= RS_DISTRIB_SUBSCRIBED; - locked_notifyGroupChanged(git->second); + locked_notifyGroupChanged(git->second, GRP_SUBSCRIBED); mGroupsRepublish = true; + + /* reprocess groups messages .... so actions can be taken (by inherited) + * This could be an very expensive operation! .... but they asked for it. + * + * Hopefully a LoadList call will have on existing messages! + */ + + std::map::iterator mit; + std::list::iterator pit; + + /* assume that each peer can provide all of them */ + for(mit = git->second.msgs.begin(); + mit != git->second.msgs.end(); mit++) + { + for(pit = git->second.sources.begin(); + pit != git->second.sources.end(); pit++) + { + locked_eventDuplicateMsg(&(git->second), mit->second, *pit); + } + } } } else @@ -1122,7 +1148,7 @@ bool p3GroupDistrib::subscribeToGroup(std::string grpId, bool subscribe) { git->second.flags &= (~RS_DISTRIB_SUBSCRIBED); - locked_notifyGroupChanged(git->second); + locked_notifyGroupChanged(git->second, GRP_UNSUBSCRIBED); mGroupsRepublish = true; } } @@ -2562,7 +2588,7 @@ std::ostream &operator<<(std::ostream &out, const GroupInfo &info) return out; } -void p3GroupDistrib::locked_notifyGroupChanged(GroupInfo &info) +void p3GroupDistrib::locked_notifyGroupChanged(GroupInfo &info, uint32_t flags) { mGroupsChanged = true; info.grpChanged = true; diff --git a/libretroshare/src/services/p3distrib.h b/libretroshare/src/services/p3distrib.h index bf7655ed3..954854e7a 100644 --- a/libretroshare/src/services/p3distrib.h +++ b/libretroshare/src/services/p3distrib.h @@ -194,6 +194,14 @@ class GroupCache uint16_t cacheSubId; }; + /* Flags for locked_notifyGroupChanged() ***/ + +const uint32_t GRP_NEW_UPDATE = 0x0001; +const uint32_t GRP_UPDATE = 0x0002; +const uint32_t GRP_LOAD_KEY = 0x0003; +const uint32_t GRP_NEW_MSG = 0x0004; +const uint32_t GRP_SUBSCRIBED = 0x0005; +const uint32_t GRP_UNSUBSCRIBED = 0x0006; class p3GroupDistrib: public CacheSource, public CacheStore, public p3Config, public nullService { @@ -279,7 +287,9 @@ RsDistribMsg *locked_getGroupMsg(std::string grpId, std::string msgId); /***************************** Event Feedback ******************************************/ /***************************************************************************************/ -virtual bool locked_eventUpdateGroup(GroupInfo *, bool isNew) = 0; + protected: + /* root version of this function must be called */ +virtual void locked_notifyGroupChanged(GroupInfo &info, uint32_t flags); virtual bool locked_eventDuplicateMsg(GroupInfo *, RsDistribMsg *, std::string id) = 0; virtual bool locked_eventNewMsg(GroupInfo *, RsDistribMsg *, std::string id) = 0; @@ -347,9 +357,6 @@ virtual bool locked_choosePublishKey(GroupInfo &info); //virtual RsDistribGrp *locked_createPublicDistribGrp(GroupInfo &info); //virtual RsDistribGrp *locked_createPrivateDistribGrp(GroupInfo &info); - protected: - /* root version of this function must be called */ -virtual void locked_notifyGroupChanged(GroupInfo &info); /***************************************************************************************/ /***************************** Utility Functions ***************************************/ diff --git a/libretroshare/src/services/p3forums.cc b/libretroshare/src/services/p3forums.cc index e045439e5..b60b5de93 100644 --- a/libretroshare/src/services/p3forums.cc +++ b/libretroshare/src/services/p3forums.cc @@ -362,23 +362,30 @@ bool p3Forums::forumSubscribe(std::string fId, bool subscribe) #include "pqi/pqinotify.h" -bool p3Forums::locked_eventUpdateGroup(GroupInfo *info, bool isNew) +void p3Forums::locked_notifyGroupChanged(GroupInfo &grp, uint32_t flags) { - std::string grpId = info->grpId; + std::string grpId = grp.grpId; std::string msgId; std::string nullId; - if (isNew) - { - getPqiNotify()->AddFeedItem(RS_FEED_ITEM_FORUM_NEW, grpId, msgId, nullId); - } - else - { - getPqiNotify()->AddFeedItem(RS_FEED_ITEM_FORUM_UPDATE, grpId, msgId, nullId); - } - - return true; + switch(flags) + { + case GRP_NEW_UPDATE: + getPqiNotify()->AddFeedItem(RS_FEED_ITEM_FORUM_NEW, grpId, msgId, nullId); + break; + case GRP_UPDATE: + getPqiNotify()->AddFeedItem(RS_FEED_ITEM_FORUM_UPDATE, grpId, msgId, nullId); + break; + case GRP_LOAD_KEY: + break; + case GRP_NEW_MSG: + break; + case GRP_SUBSCRIBED: + break; + } + return p3GroupDistrib::locked_notifyGroupChanged(grp, flags); } + bool p3Forums::locked_eventDuplicateMsg(GroupInfo *grp, RsDistribMsg *msg, std::string id) { return true; diff --git a/libretroshare/src/services/p3forums.h b/libretroshare/src/services/p3forums.h index 551d304cb..f3709e4a5 100644 --- a/libretroshare/src/services/p3forums.h +++ b/libretroshare/src/services/p3forums.h @@ -99,7 +99,7 @@ virtual bool forumSubscribe(std::string fId, bool subscribe); /****************** Event Feedback (Overloaded form p3distrib) *************************/ /***************************************************************************************/ -virtual bool locked_eventUpdateGroup(GroupInfo *, bool isNew); +virtual void locked_notifyGroupChanged(GroupInfo &grp, uint32_t flags); virtual bool locked_eventDuplicateMsg(GroupInfo *, RsDistribMsg *, std::string); virtual bool locked_eventNewMsg(GroupInfo *, RsDistribMsg *, std::string);