From e9b9dce9f539b41a434da36f43f8edc79202dcb1 Mon Sep 17 00:00:00 2001 From: csoler Date: Fri, 22 May 2015 20:54:38 +0000 Subject: [PATCH] created 2 subclasses of RsThread, one for ticking services, and one for single shot jobs. Now all threads use the same base code. git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@8288 b45a01b8-16f6-495d-af2f-9b41ad6348cc --- libresapi/src/api/RsControlModule.cpp | 3 +- libresapi/src/api/RsControlModule.h | 3 +- libretroshare/src/dbase/fimonitor.cc | 17 +- libretroshare/src/dbase/fimonitor.h | 4 +- libretroshare/src/ft/ftcontroller.cc | 9 +- libretroshare/src/ft/ftcontroller.h | 5 +- libretroshare/src/ft/ftextralist.cc | 67 ++--- libretroshare/src/ft/ftextralist.h | 6 +- libretroshare/src/ft/fttransfermodule.cc | 6 +- libretroshare/src/gxs/rsgenexchange.cc | 10 +- libretroshare/src/gxs/rsgenexchange.h | 4 +- libretroshare/src/gxs/rsgxsnetservice.cc | 16 +- libretroshare/src/gxs/rsgxsnetservice.h | 3 +- libretroshare/src/gxs/rsgxsutil.h | 6 +- libretroshare/src/pqi/authgpg.cc | 37 ++- libretroshare/src/pqi/authgpg.h | 7 +- libretroshare/src/pqi/pqithreadstreamer.cc | 5 +- libretroshare/src/pqi/pqithreadstreamer.h | 26 +- libretroshare/src/rsserver/p3face-config.cc | 10 +- libretroshare/src/rsserver/p3face-server.cc | 274 ++++++++---------- libretroshare/src/rsserver/p3face.h | 18 +- libretroshare/src/services/p3service.h | 2 +- libretroshare/src/util/rsthreads.cc | 55 ++-- libretroshare/src/util/rsthreads.h | 45 ++- .../FeedReader/services/p3FeedReaderThread.cc | 6 +- .../FeedReader/services/p3FeedReaderThread.h | 4 +- retroshare-nogui/src/TerminalApiClient.cpp | 2 +- retroshare-nogui/src/TerminalApiClient.h | 2 +- 28 files changed, 317 insertions(+), 335 deletions(-) diff --git a/libresapi/src/api/RsControlModule.cpp b/libresapi/src/api/RsControlModule.cpp index a38a6b5f0..dfd59a574 100644 --- a/libresapi/src/api/RsControlModule.cpp +++ b/libresapi/src/api/RsControlModule.cpp @@ -43,8 +43,7 @@ RsControlModule::RsControlModule(int argc, char **argv, StateTokenServer* sts, A RsControlModule::~RsControlModule() { - if(isRunning()) - join(); +// join(); } bool RsControlModule::processShouldExit() diff --git a/libresapi/src/api/RsControlModule.h b/libresapi/src/api/RsControlModule.h index b36144957..0619ab6cb 100644 --- a/libresapi/src/api/RsControlModule.h +++ b/libresapi/src/api/RsControlModule.h @@ -19,8 +19,7 @@ class ApiServer; // - handle password callback // - confirm plugin loading // - shutdown retroshare -class RsControlModule: public ResourceRouter, NotifyClient, - private RsThread +class RsControlModule: public ResourceRouter, NotifyClient, private RsSingleJobThread { public: // ApiServer will be called once RS is started, to load additional api modules diff --git a/libretroshare/src/dbase/fimonitor.cc b/libretroshare/src/dbase/fimonitor.cc index e08c2b077..a67339f1e 100644 --- a/libretroshare/src/dbase/fimonitor.cc +++ b/libretroshare/src/dbase/fimonitor.cc @@ -61,8 +61,8 @@ FileIndexMonitor::FileIndexMonitor(CacheStrapper *cs, std::string cachedir, cons mForceCheck(false), mInCheck(false), hashCache(config_dir+"/" + "file_cache"),useHashCache(true) { - updatePeriod = 15 * 60; // 15 minutes - reference_time = 0 ; + updatePeriod = 15 * 60; // 15 minutes + reference_time = 0 ; } bool FileIndexMonitor::autoCheckEnabled() const @@ -622,15 +622,13 @@ void FileIndexMonitor::setPeriod(int period) #endif } -void FileIndexMonitor::run() +void FileIndexMonitor::data_tick() { - if(autoCheckEnabled()) - updateCycle(); + if(autoCheckEnabled()) + updateCycle(); - while(isRunning()) - { - int i=0 ; - for(;;++i) + int i=0 ; + for(;;++i) { if(!isRunning()) return; @@ -651,7 +649,6 @@ void FileIndexMonitor::run() if(i < abs(updatePeriod) || autoCheckEnabled()) updateCycle(); - } } void FileIndexMonitor::updateCycle() diff --git a/libretroshare/src/dbase/fimonitor.h b/libretroshare/src/dbase/fimonitor.h index 3a6b4d9c3..dc616625d 100644 --- a/libretroshare/src/dbase/fimonitor.h +++ b/libretroshare/src/dbase/fimonitor.h @@ -103,7 +103,7 @@ class HashCache * FileIndexMonitor *****************************************************************************************/ -class FileIndexMonitor: public CacheSource, public RsThread +class FileIndexMonitor: public CacheSource, public RsTickingThread { public: FileIndexMonitor(CacheStrapper *cs, std::string cachedir, const RsPeerId& pid, const std::string& config_dir); @@ -131,7 +131,7 @@ class FileIndexMonitor: public CacheSource, public RsThread /* the FileIndexMonitor inner workings */ //virtual void run(std::string& currentJob); /* overloaded from RsThread */ //void updateCycle(std::string& currentJob); - virtual void run(); /* overloaded from RsThread */ + virtual void data_tick(); /* overloaded from RsThread */ void updateCycle(); // Interface for browsing dir hirarchy diff --git a/libretroshare/src/ft/ftcontroller.cc b/libretroshare/src/ft/ftcontroller.cc index 704bdf46f..7e38fa161 100644 --- a/libretroshare/src/ft/ftcontroller.cc +++ b/libretroshare/src/ft/ftcontroller.cc @@ -115,6 +115,7 @@ ftController::ftController(CacheStrapper *cs, ftDataMultiplex *dm, p3ServiceCont _max_active_downloads = 5 ; // default queue size _min_prioritized_transfers = 3 ; /* TODO */ + cnt = 0 ; } void ftController::setTurtleRouter(p3turtle *pt) { mTurtle = pt ; } @@ -209,14 +210,10 @@ void ftController::removeFileSource(const RsFileHash& hash,const RsPeerId& peer_ std::cerr << "... not added: hash not found." << std::endl ; #endif } -void ftController::run() +void ftController::data_tick() { - /* check the queues */ - uint32_t cnt = 0 ; - while(isRunning()) - { //Waiting 1 sec before start usleep(1*1000*1000); // 1 sec @@ -276,8 +273,6 @@ void ftController::run() if(cnt++ % 10 == 0) checkDownloadQueue() ; - } - } void ftController::searchForDirectSources() diff --git a/libretroshare/src/ft/ftcontroller.h b/libretroshare/src/ft/ftcontroller.h index f78b66d86..fe20a7845 100644 --- a/libretroshare/src/ft/ftcontroller.h +++ b/libretroshare/src/ft/ftcontroller.h @@ -113,7 +113,7 @@ class ftPendingRequest }; -class ftController: public CacheTransfer, public RsThread, public pqiServiceMonitor, public p3Config +class ftController: public CacheTransfer, public RsTickingThread, public pqiServiceMonitor, public p3Config { public: @@ -126,7 +126,7 @@ class ftController: public CacheTransfer, public RsThread, public pqiServiceMoni bool activate(); bool isActiveAndNoPending(); - virtual void run(); + virtual void data_tick(); /***************************************************************/ /********************** Controller Access **********************/ @@ -243,6 +243,7 @@ class ftController: public CacheTransfer, public RsThread, public pqiServiceMoni p3ServiceControl *mServiceCtrl; uint32_t mFtServiceId; + uint32_t cnt ; RsMutex ctrlMutex; std::map mCompleted; diff --git a/libretroshare/src/ft/ftextralist.cc b/libretroshare/src/ft/ftextralist.cc index 9b81db1f9..7ff5fb681 100644 --- a/libretroshare/src/ft/ftextralist.cc +++ b/libretroshare/src/ft/ftextralist.cc @@ -43,60 +43,55 @@ ftExtraList::ftExtraList() :p3Config(), extMutex("p3Config") { - return; + cleanup = 0; + return; } -void ftExtraList::run() +void ftExtraList::data_tick() { - bool todo = false; - time_t cleanup = 0; - time_t now = 0; + bool todo = false; + time_t now = time(NULL); - while (isRunning()) - { #ifdef DEBUG_ELIST - //std::cerr << "ftExtraList::run() Iteration"; - //std::cerr << std::endl; + //std::cerr << "ftExtraList::run() Iteration"; + //std::cerr << std::endl; #endif - now = time(NULL); + { + RsStackMutex stack(extMutex); - { - RsStackMutex stack(extMutex); + todo = (mToHash.size() > 0); + } - todo = (mToHash.size() > 0); - } - - if (todo) - { - /* Hash a file */ - hashAFile(); + if (todo) + { + /* Hash a file */ + hashAFile(); #ifdef WIN32 - Sleep(1); + Sleep(1); #else - /* microsleep */ - usleep(10); + /* microsleep */ + usleep(10); #endif - } - else - { - /* cleanup */ - if (cleanup < now) - { - cleanupOldFiles(); - cleanup = now + CLEANUP_PERIOD; - } + } + else + { + /* cleanup */ + if (cleanup < now) + { + cleanupOldFiles(); + cleanup = now + CLEANUP_PERIOD; + } - /* sleep */ + /* sleep */ #ifdef WIN32 - Sleep(1000); + Sleep(1000); #else - sleep(1); + sleep(1); #endif - } - } + } } diff --git a/libretroshare/src/ft/ftextralist.h b/libretroshare/src/ft/ftextralist.h index 76a07d93b..9e1bed3bf 100644 --- a/libretroshare/src/ft/ftextralist.h +++ b/libretroshare/src/ft/ftextralist.h @@ -106,7 +106,7 @@ const uint32_t FT_DETAILS_REMOTE = 0x0002; const uint32_t CLEANUP_PERIOD = 600; /* 10 minutes */ -class ftExtraList: public RsThread, public p3Config, public ftSearch +class ftExtraList: public RsTickingThread, public p3Config, public ftSearch { public: @@ -143,7 +143,7 @@ virtual bool search(const RsFileHash &hash, FileSearchFlags hintflags, FileIn /*** * Thread Main Loop **/ -virtual void run(); +virtual void data_tick(); /*** * Configuration - store extra files. @@ -167,6 +167,8 @@ bool cleanupEntry(std::string path, TransferRequestFlags flags); std::map mHashedList; /* path -> hash ( not saved ) */ std::map mFiles; + + time_t cleanup ; }; diff --git a/libretroshare/src/ft/fttransfermodule.cc b/libretroshare/src/ft/fttransfermodule.cc index cac86a7b5..12e275431 100644 --- a/libretroshare/src/ft/fttransfermodule.cc +++ b/libretroshare/src/ft/fttransfermodule.cc @@ -542,13 +542,13 @@ bool ftTransferModule::isCheckingHash() return mFlag == FT_TM_FLAG_CHECKING || mFlag == FT_TM_FLAG_CHUNK_CRC; } -class HashThread: public RsThread +class HashThread: public RsSingleJobThread { public: HashThread(ftFileCreator *m) : _hashThreadMtx("HashThread"), _m(m),_finished(false),_hash("") {} - virtual void run() + virtual void run() { #ifdef FT_DEBUG std::cerr << "hash thread is running for file " << std::endl; @@ -609,8 +609,6 @@ bool ftTransferModule::checkFile() RsFileHash check_hash( _hash_thread->hash() ) ; - _hash_thread->join(); // allow releasing of resources when finished. - delete _hash_thread ; _hash_thread = NULL ; diff --git a/libretroshare/src/gxs/rsgenexchange.cc b/libretroshare/src/gxs/rsgenexchange.cc index eaf1d5ebb..e844437a2 100644 --- a/libretroshare/src/gxs/rsgenexchange.cc +++ b/libretroshare/src/gxs/rsgenexchange.cc @@ -146,16 +146,13 @@ RsGenExchange::~RsGenExchange() } -void RsGenExchange::run() +void RsGenExchange::data_tick() { - double timeDelta = 0.1; // slow tick in sec + static const double timeDelta = 0.1; // slow tick in sec - while(isRunning()) { tick(); - - usleep((int) (timeDelta * 1000 *1000)); // timeDelta sec - }//while(isRunning()) + usleep((int) (timeDelta * 1000 *1000)); // timeDelta sec } void RsGenExchange::tick() @@ -238,7 +235,6 @@ void RsGenExchange::tick() mNotifications.push_back(c); } - mIntegrityCheck->join(); delete mIntegrityCheck; mIntegrityCheck = NULL; mLastCheck = time(NULL); diff --git a/libretroshare/src/gxs/rsgenexchange.h b/libretroshare/src/gxs/rsgenexchange.h index 436ecc88b..ba59dcaac 100644 --- a/libretroshare/src/gxs/rsgenexchange.h +++ b/libretroshare/src/gxs/rsgenexchange.h @@ -109,7 +109,7 @@ typedef std::map > GxsMsgRelatedDa class RsGixs; -class RsGenExchange : public RsNxsObserver, public RsThread, public RsGxsIface +class RsGenExchange : public RsNxsObserver, public RsTickingThread, public RsGxsIface { public: @@ -182,7 +182,7 @@ public: */ RsTokenService* getTokenService(); - void run(); + virtual void data_tick(); /*! * Policy bit pattern portion diff --git a/libretroshare/src/gxs/rsgxsnetservice.cc b/libretroshare/src/gxs/rsgxsnetservice.cc index 9f74eb818..1f9a186f3 100644 --- a/libretroshare/src/gxs/rsgxsnetservice.cc +++ b/libretroshare/src/gxs/rsgxsnetservice.cc @@ -73,6 +73,7 @@ RsGxsNetService::RsGxsNetService(uint16_t servType, RsGeneralDataService *gds, { addSerialType(new RsNxsSerialiser(mServType)); mOwnId = mNetMgr->getOwnId(); + mUpdateCounter = 0; } RsGxsNetService::~RsGxsNetService() @@ -1149,23 +1150,20 @@ bool RsGxsNetService::locked_processTransac(RsNxsTransac* item) return false; } -void RsGxsNetService::run() +void RsGxsNetService::data_tick() { - double timeDelta = 0.5; - int updateCounter = 0; + static const double timeDelta = 0.5; - while(isRunning()) - { //Start waiting as nothing to do in runup usleep((int) (timeDelta * 1000 * 1000)); // timeDelta sec - if(updateCounter >= 20) + if(mUpdateCounter >= 20) { updateServerSyncTS(); - updateCounter = 0; + mUpdateCounter = 0; } else - updateCounter++; + mUpdateCounter++; // process active transactions processTransactions(); @@ -1177,8 +1175,6 @@ void RsGxsNetService::run() runVetting(); processExplicitGroupRequests(); - - } } void RsGxsNetService::updateServerSyncTS() diff --git a/libretroshare/src/gxs/rsgxsnetservice.h b/libretroshare/src/gxs/rsgxsnetservice.h index bcdd8241d..4d6173aeb 100644 --- a/libretroshare/src/gxs/rsgxsnetservice.h +++ b/libretroshare/src/gxs/rsgxsnetservice.h @@ -168,7 +168,7 @@ public: /*! * Processes transactions and job queue */ - void run(); + virtual void data_tick(); private: /*! @@ -477,6 +477,7 @@ private: uint32_t mLastKeyPublishTs; const uint32_t mSYNC_PERIOD; + int mUpdateCounter ; RsGcxs* mCircles; RsGixsReputation* mReputations; diff --git a/libretroshare/src/gxs/rsgxsutil.h b/libretroshare/src/gxs/rsgxsutil.h index e2a7d2a88..e7767bd77 100644 --- a/libretroshare/src/gxs/rsgxsutil.h +++ b/libretroshare/src/gxs/rsgxsutil.h @@ -61,7 +61,7 @@ inline RsGxsGrpMsgIdPair getMsgIdPair(RsGxsMsgItem& msg) * Does message clean up based on individual group expirations first * if avialable. If not then deletion s */ -class RsGxsMessageCleanUp : public RsThread +class RsGxsMessageCleanUp //: public RsThread { public: @@ -84,7 +84,7 @@ public: /*! * TODO: Rather than manual progressions consider running through a thread */ - void run(){} + //virtual void data_tick(){} private: @@ -97,7 +97,7 @@ private: * Checks the integrity message and groups * in rsDataService using computed hash */ -class RsGxsIntegrityCheck : public RsThread +class RsGxsIntegrityCheck : public RsSingleJobThread { enum CheckState { CheckStart, CheckChecking }; diff --git a/libretroshare/src/pqi/authgpg.cc b/libretroshare/src/pqi/authgpg.cc index 92309af9e..6a00ea5c3 100644 --- a/libretroshare/src/pqi/authgpg.cc +++ b/libretroshare/src/pqi/authgpg.cc @@ -130,6 +130,7 @@ AuthGPG::AuthGPG(const std::string& path_to_public_keyring,const std::string& pa { _force_sync_database = false ; start(); + int mCount = 0; } /* This function is called when retroshare is first started @@ -178,30 +179,26 @@ int AuthGPG::GPGInit(const RsPgpId &ownId) { } -void AuthGPG::run() +void AuthGPG::data_tick() { - int count = 0; + usleep(100 * 1000); //100 msec - while (isRunning()) { - usleep(100 * 1000); //100 msec + /// every 100 milliseconds + processServices(); - /// every 100 milliseconds - processServices(); + /// every ten seconds + if (++mCount >= 100 || _force_sync_database) { + RsStackMutex stack(gpgMtxService); ///******* LOCKED ****** - /// every ten seconds - if (++count >= 100 || _force_sync_database) { - RsStackMutex stack(gpgMtxService); ///******* LOCKED ****** - - /// The call does multiple things at once: - /// - checks whether the keyring has changed in memory - /// - checks whether the keyring has changed on disk. - /// - merges/updates according to status. - /// - PGPHandler::syncDatabase() ; - count = 0; - _force_sync_database = false ; - }//if (++count >= 100 || _force_sync_database) - }//while (isRunning()) + /// The call does multiple things at once: + /// - checks whether the keyring has changed in memory + /// - checks whether the keyring has changed on disk. + /// - merges/updates according to status. + /// + PGPHandler::syncDatabase() ; + mCount = 0; + _force_sync_database = false ; + }//if (++count >= 100 || _force_sync_database) } void AuthGPG::processServices() diff --git a/libretroshare/src/pqi/authgpg.h b/libretroshare/src/pqi/authgpg.h index bfa09a6c5..b7d91c289 100644 --- a/libretroshare/src/pqi/authgpg.h +++ b/libretroshare/src/pqi/authgpg.h @@ -96,7 +96,7 @@ public: virtual void setGPGOperation(AuthGPGOperation *operation) = 0; }; -class AuthGPG: public p3Config, public RsThread, public PGPHandler +class AuthGPG: public p3Config, public RsTickingThread, public PGPHandler { public: @@ -274,7 +274,7 @@ class AuthGPG: public p3Config, public RsThread, public PGPHandler bool printOwnKeys_locked(); /* own thread */ - virtual void run(); + virtual void data_tick(); private: @@ -295,7 +295,8 @@ class AuthGPG: public p3Config, public RsThread, public PGPHandler RsPgpId mOwnGpgId; bool gpgKeySelected; - bool _force_sync_database ; + bool _force_sync_database ; + uint32_t mCount ; std::list services ; diff --git a/libretroshare/src/pqi/pqithreadstreamer.cc b/libretroshare/src/pqi/pqithreadstreamer.cc index 38b26cc47..a659b35bb 100644 --- a/libretroshare/src/pqi/pqithreadstreamer.cc +++ b/libretroshare/src/pqi/pqithreadstreamer.cc @@ -53,7 +53,7 @@ int pqithreadstreamer::tick() return 0; } -int pqithreadstreamer::data_tick() +void pqithreadstreamer::data_tick() { uint32_t recv_timeout = 0; uint32_t sleep_period = 0; @@ -68,7 +68,7 @@ int pqithreadstreamer::data_tick() if (!isactive) { usleep(DEFAULT_STREAMER_IDLE_SLEEP); - return 0; + return ; } { @@ -92,7 +92,6 @@ int pqithreadstreamer::data_tick() { usleep(sleep_period); } - return 1; } diff --git a/libretroshare/src/pqi/pqithreadstreamer.h b/libretroshare/src/pqi/pqithreadstreamer.h index c31f0a7ab..65cfabdfe 100644 --- a/libretroshare/src/pqi/pqithreadstreamer.h +++ b/libretroshare/src/pqi/pqithreadstreamer.h @@ -29,27 +29,25 @@ #include "pqi/pqistreamer.h" #include "util/rsthreads.h" -class pqithreadstreamer: public pqistreamer, public RsThread +class pqithreadstreamer: public pqistreamer, public RsTickingThread { - public: - pqithreadstreamer(PQInterface *parent, RsSerialiser *rss, const RsPeerId& peerid, BinInterface *bio_in, int bio_flagsin); +public: + pqithreadstreamer(PQInterface *parent, RsSerialiser *rss, const RsPeerId& peerid, BinInterface *bio_in, int bio_flagsin); -virtual bool RecvItem(RsItem *item); -virtual int tick(); + // from pqistreamer + virtual bool RecvItem(RsItem *item); + virtual int tick(); protected: - // from RsThread - virtual void run(); + virtual void data_tick(); - int data_tick(); - - PQInterface *mParent; - uint32_t mTimeout; - uint32_t mSleepPeriod; + PQInterface *mParent; + uint32_t mTimeout; + uint32_t mSleepPeriod; private: - /* thread variables */ - RsMutex mThreadMutex; + /* thread variables */ + RsMutex mThreadMutex; }; #endif //MRK_PQI_THREAD_STREAMER_HEADER diff --git a/libretroshare/src/rsserver/p3face-config.cc b/libretroshare/src/rsserver/p3face-config.cc index 63b424c6e..4745f6eeb 100644 --- a/libretroshare/src/rsserver/p3face-config.cc +++ b/libretroshare/src/rsserver/p3face-config.cc @@ -71,7 +71,7 @@ void RsServer::ConfigFinalSave() mConfigMgr->completeConfiguration(); } -void RsServer::startServiceThread(RsThread *t) +void RsServer::startServiceThread(RsTickingThread *t) { t->start() ; mRegisteredServiceThreads.push_back(t) ; @@ -87,13 +87,13 @@ void RsServer::rsGlobalShutDown() mNetMgr->shutdown(); /* Handles UPnP */ - join(); + fullstop() ; - // kill all registered service threads + // kill all registered service threads - for(std::list::iterator it= mRegisteredServiceThreads.begin();it!=mRegisteredServiceThreads.end();++it) + for(std::list::iterator it= mRegisteredServiceThreads.begin();it!=mRegisteredServiceThreads.end();++it) { - (*it)->join() ; + (*it)->fullstop() ; } // #ifdef RS_ENABLE_GXS // // We should automate this. diff --git a/libretroshare/src/rsserver/p3face-server.cc b/libretroshare/src/rsserver/p3face-server.cc index 6cc28f6f5..5b8c993a3 100644 --- a/libretroshare/src/rsserver/p3face-server.cc +++ b/libretroshare/src/rsserver/p3face-server.cc @@ -49,6 +49,25 @@ int rsserverzone = 101; ****/ #define WARN_BIG_CYCLE_TIME (0.2) +#ifdef WINDOWS_SYS +#include +#include +#endif + +static double getCurrentTS() +{ + +#ifndef WINDOWS_SYS + struct timeval cts_tmp; + gettimeofday(&cts_tmp, NULL); + double cts = (cts_tmp.tv_sec) + ((double) cts_tmp.tv_usec) / 1000000.0; +#else + struct _timeb timebuf; + _ftime( &timebuf); + double cts = (timebuf.time) + ((double) timebuf.millitm) / 1000.0; +#endif + return cts; +} RsServer::RsServer() @@ -74,22 +93,20 @@ RsServer::RsServer() chatSrv = NULL; mStatusSrv = NULL; - /* caches (that need ticking) */ + mMin = 0; + mLoop = 0; + + mAvgTickRate = mTimeDelta; + + mLastts = getCurrentTS(); + mLastSec = 0; /* for the slower ticked stuff */ + mTimeDelta = 0.25 ; + + /* caches (that need ticking) */ /* Config */ mConfigMgr = NULL; mGeneralConfig = NULL; - - /* GXS - Amazingly we can still initialise these - * even without knowing the data-types (they are just pointers???) - */ -// mPhoto = NULL; -// mWiki = NULL; -// mPosted = NULL; -// mGxsCircles = NULL; -// mGxsIdService = NULL; -// mGxsForums = NULL; -// mWire = NULL; } RsServer::~RsServer() @@ -101,182 +118,143 @@ RsServer::~RsServer() ----> MUST BE LOCKED! */ -#ifdef WINDOWS_SYS -#include -#include -#endif - -static double getCurrentTS() -{ - -#ifndef WINDOWS_SYS - struct timeval cts_tmp; - gettimeofday(&cts_tmp, NULL); - double cts = (cts_tmp.tv_sec) + ((double) cts_tmp.tv_usec) / 1000000.0; -#else - struct _timeb timebuf; - _ftime( &timebuf); - double cts = (timebuf.time) + ((double) timebuf.millitm) / 1000.0; -#endif - return cts; -} /* Thread Fn: Run the Core */ -void RsServer::run() +void RsServer::data_tick() { - - double timeDelta = 0.25; - double minTimeDelta = 0.1; // 25; - double maxTimeDelta = 0.5; - double kickLimit = 0.15; - - double avgTickRate = timeDelta; - - double lastts, ts; - lastts = ts = getCurrentTS(); - - long lastSec = 0; /* for the slower ticked stuff */ - - int min = 0; - int loop = 0; - - while(isRunning()) - { #ifndef WINDOWS_SYS - usleep((int) (timeDelta * 1000000)); + usleep((int) (mTimeDelta * 1000000)); #else - Sleep((int) (timeDelta * 1000)); + Sleep((int) (mTimeDelta * 1000)); #endif - ts = getCurrentTS(); - double delta = ts - lastts; + double ts = getCurrentTS(); + double delta = ts - mLastts; - /* for the fast ticked stuff */ - if (delta > timeDelta) - { + /* for the fast ticked stuff */ + if (delta > mTimeDelta) + { #ifdef DEBUG_TICK - std::cerr << "Delta: " << delta << std::endl; - std::cerr << "Time Delta: " << timeDelta << std::endl; - std::cerr << "Avg Tick Rate: " << avgTickRate << std::endl; + std::cerr << "Delta: " << delta << std::endl; + std::cerr << "Time Delta: " << mTimeDelta << std::endl; + std::cerr << "Avg Tick Rate: " << mAvgTickRate << std::endl; #endif - lastts = ts; + mLastts = ts; - /******************************** RUN SERVER *****************/ - lockRsCore(); + /******************************** RUN SERVER *****************/ + lockRsCore(); - int moreToTick = pqih->tick(); + int moreToTick = pqih->tick(); #ifdef DEBUG_TICK - std::cerr << "RsServer::run() ftserver->tick(): moreToTick: " << moreToTick << std::endl; + std::cerr << "RsServer::run() ftserver->tick(): moreToTick: " << moreToTick << std::endl; #endif - unlockRsCore(); + unlockRsCore(); - /* tick the Managers */ - mPeerMgr->tick(); - mLinkMgr->tick(); - mNetMgr->tick(); - /******************************** RUN SERVER *****************/ + /* tick the Managers */ + mPeerMgr->tick(); + mLinkMgr->tick(); + mNetMgr->tick(); + /******************************** RUN SERVER *****************/ - /* adjust tick rate depending on whether there is more. - */ + /* adjust tick rate depending on whether there is more. + */ - avgTickRate = 0.2 * timeDelta + 0.8 * avgTickRate; + mAvgTickRate = 0.2 * mTimeDelta + 0.8 * mAvgTickRate; - if (1 == moreToTick) - { - timeDelta = 0.9 * avgTickRate; - if (timeDelta > kickLimit) - { - /* force next tick in one sec - * if we are reading data. - */ - timeDelta = kickLimit; - avgTickRate = kickLimit; - } - } - else - { - timeDelta = 1.1 * avgTickRate; - } + if (1 == moreToTick) + { + mTimeDelta = 0.9 * mAvgTickRate; + if (mTimeDelta > kickLimit) + { + /* force next tick in one sec + * if we are reading data. + */ + mTimeDelta = kickLimit; + mAvgTickRate = kickLimit; + } + } + else + { + mTimeDelta = 1.1 * mAvgTickRate; + } - /* limiter */ - if (timeDelta < minTimeDelta) - { - timeDelta = minTimeDelta; - } - else if (timeDelta > maxTimeDelta) - { - timeDelta = maxTimeDelta; - } + /* limiter */ + if (mTimeDelta < minTimeDelta) + { + mTimeDelta = minTimeDelta; + } + else if (mTimeDelta > maxTimeDelta) + { + mTimeDelta = maxTimeDelta; + } - /* Fast Updates */ + /* Fast Updates */ - /* now we have the slow ticking stuff */ - /* stuff ticked once a second (but can be slowed down) */ - if ((int) ts > lastSec) - { - lastSec = (int) ts; + /* now we have the slow ticking stuff */ + /* stuff ticked once a second (but can be slowed down) */ + if ((int) ts > mLastSec) + { + mLastSec = (int) ts; - // Every second! (UDP keepalive). - //tou_tick_stunkeepalive(); + // Every second! (UDP keepalive). + //tou_tick_stunkeepalive(); - // every five loops (> 5 secs) - if (loop % 5 == 0) - { - // update_quick_stats(); + // every five loops (> 5 secs) + if (mLoop % 5 == 0) + { + // update_quick_stats(); - // Update All Every 5 Seconds. - // These Update Functions do the locking themselves. + // Update All Every 5 Seconds. + // These Update Functions do the locking themselves. #ifdef DEBUG_TICK - std::cerr << "RsServer::run() Updates()" << std::endl; + std::cerr << "RsServer::run() Updates()" << std::endl; #endif - mConfigMgr->tick(); /* saves stuff */ + mConfigMgr->tick(); /* saves stuff */ - } + } - // every 60 loops (> 1 min) - if (++loop >= 60) - { - loop = 0; + // every 60 loops (> 1 min) + if (++mLoop >= 60) + { + mLoop = 0; - /* force saving FileTransferStatus TODO */ - //ftserver->saveFileTransferStatus(); + /* force saving FileTransferStatus TODO */ + //ftserver->saveFileTransferStatus(); - /* see if we need to resave certs */ - //AuthSSL::getAuthSSL()->CheckSaveCertificates(); + /* see if we need to resave certs */ + //AuthSSL::getAuthSSL()->CheckSaveCertificates(); - /* hour loop */ - if (++min >= 60) - { - min = 0; - } - } + /* hour loop */ + if (++mMin >= 60) + { + mMin = 0; + } + } - /* Tick slow services */ - if(rsPlugins) - rsPlugins->slowTickPlugins((time_t)ts); + /* Tick slow services */ + if(rsPlugins) + rsPlugins->slowTickPlugins((time_t)ts); - // slow update tick as well. - // update(); - } // end of slow tick. + // slow update tick as well. + // update(); + } // end of slow tick. - } // end of only once a second. + } // end of only once a second. - double endCycleTs = getCurrentTS(); - double cycleTime = endCycleTs - ts; - if (cycleTime > WARN_BIG_CYCLE_TIME) - { - std::string out; - rs_sprintf(out, "RsServer::run() WARNING Excessively Long Cycle Time: %g secs => Please DEBUG", cycleTime); - std::cerr << out << std::endl; + double endCycleTs = getCurrentTS(); + double cycleTime = endCycleTs - ts; + if (cycleTime > WARN_BIG_CYCLE_TIME) + { + std::string out; + rs_sprintf(out, "RsServer::run() WARNING Excessively Long Cycle Time: %g secs => Please DEBUG", cycleTime); + std::cerr << out << std::endl; - rslog(RSL_ALERT, rsserverzone, out); - } - } - return; + rslog(RSL_ALERT, rsserverzone, out); + } } diff --git a/libretroshare/src/rsserver/p3face.h b/libretroshare/src/rsserver/p3face.h index ce2ec184d..725631921 100644 --- a/libretroshare/src/rsserver/p3face.h +++ b/libretroshare/src/rsserver/p3face.h @@ -74,7 +74,7 @@ class RsPluginManager; //int InitRetroShare(int argc, char **argv, RsInit *config); //int LoadCertificates(RsInit *config); -class RsServer: public RsControl, public RsThread +class RsServer: public RsControl, public RsTickingThread { public: /****************************************/ @@ -88,7 +88,7 @@ class RsServer: public RsControl, public RsThread virtual ~RsServer(); /* Thread Fn: Run the Core */ - virtual void run(); + virtual void data_tick(); /* locking stuff */ void lockRsCore() @@ -120,7 +120,7 @@ class RsServer: public RsControl, public RsThread /* Config */ virtual void ConfigFinalSave( ); - virtual void startServiceThread(RsThread *t) ; + virtual void startServiceThread(RsTickingThread *t) ; /************* Rs shut down function: in upnp 'port lease time' bug *****************/ @@ -162,7 +162,7 @@ class RsServer: public RsControl, public RsThread // This list contains all threaded services. It will be used to shut them down properly. - std::list mRegisteredServiceThreads ; + std::list mRegisteredServiceThreads ; /* GXS */ // p3Wiki *mWiki; @@ -184,6 +184,16 @@ class RsServer: public RsControl, public RsThread // Worker Data..... + int mMin ; + int mLoop ; + int mLastts ; + long mLastSec ; + double mAvgTickRate ; + double mTimeDelta ; + + static const double minTimeDelta = 0.1; // 25; + static const double maxTimeDelta = 0.5; + static const double kickLimit = 0.15; }; /* Helper function to convert windows paths diff --git a/libretroshare/src/services/p3service.h b/libretroshare/src/services/p3service.h index c5a204471..daba663c7 100644 --- a/libretroshare/src/services/p3service.h +++ b/libretroshare/src/services/p3service.h @@ -157,7 +157,7 @@ virtual bool send(RsRawItem *item) }; -class p3ThreadedService: public p3Service, public RsThread +class p3ThreadedService: public p3Service, public RsTickingThread { protected: diff --git a/libretroshare/src/util/rsthreads.cc b/libretroshare/src/util/rsthreads.cc index f35e4a564..71a71e7ce 100644 --- a/libretroshare/src/util/rsthreads.cc +++ b/libretroshare/src/util/rsthreads.cc @@ -56,11 +56,33 @@ void *RsThread::rsthread_init(void* p) { return NULL; } - thread -> run(); + // tell the OS to free the thread resources when this function exits + // it is a replacement for pthread_join() + pthread_detach(pthread_self()); + + thread -> runloop(); return NULL; } +RsThread::RsThread () : mMutex("RsThread") +{ + sem_init(&mHasStoppedSemaphore,0,1) ; -void RsThread::shutdown() +#ifdef WINDOWS_SYS + memset (&mTid, 0, sizeof(mTid)); +#else + mTid = 0; +#endif +} +bool RsThread::isRunning() +{ + // do we need a mutex for this ? + int sval =0; + sem_getvalue(&mHasStoppedSemaphore,&sval) ; + + return !sval ; +} + +void RsTickingThread::shutdown() { #ifdef DEBUG_THREADS std::cerr << "pqithreadstreamer::stop()" << std::endl; @@ -83,7 +105,7 @@ void RsThread::shutdown() sem_post(&mShouldStopSemaphore) ; } -void RsThread::fullstop() +void RsTickingThread::fullstop() { shutdown() ; @@ -107,7 +129,6 @@ void RsThread::start() std::cerr << " initing should_stop=0" << std::endl; std::cerr << " initing has_stopped=1" << std::endl; #endif - sem_init(&mShouldStopSemaphore,0,0) ; sem_init(&mHasStoppedSemaphore,0,0) ; int err ; @@ -125,37 +146,19 @@ void RsThread::start() } -RsThread::RsThread () : mMutex("RsThread") + +RsTickingThread::RsTickingThread () { sem_init(&mShouldStopSemaphore,0,0) ; - sem_init(&mHasStoppedSemaphore,0,1) ; - -#ifdef WINDOWS_SYS - memset (&mTid, 0, sizeof(mTid)); -#else - mTid = 0; -#endif } -bool RsThread::isRunning() -{ - // do we need a mutex for this ? - int sval =0; - sem_getvalue(&mHasStoppedSemaphore,&sval) ; - - return !sval ; -} - -void RsThread::run() +void RsTickingThread::runloop() { #ifdef DEBUG_THREADS std::cerr << "pqithreadstream::run()"; std::cerr << std::endl; #endif - - // tell the OS to free the thread resources when this function exits - // it is a replacement for pthread_join() - pthread_detach(pthread_self()); + sem_init(&mShouldStopSemaphore,0,0) ; while(1) { diff --git a/libretroshare/src/util/rsthreads.h b/libretroshare/src/util/rsthreads.h index 3a76f0845..720705728 100644 --- a/libretroshare/src/util/rsthreads.h +++ b/libretroshare/src/util/rsthreads.h @@ -177,31 +177,50 @@ pthread_t createThread(RsThread &thread); class RsThread { -public: + public: RsThread(); virtual ~RsThread() {} void start() ; - void shutdown(); - void fullstop(); - void join() { fullstop() ; } // used for compatibility bool isRunning(); protected: - void run() ; /* called once the thread is started. Should be overloaded by subclasses. */ + virtual void runloop() =0; /* called once the thread is started. Should be overloaded by subclasses. */ -private: - static void *rsthread_init(void*) ; - - pthread_t mTid; - RsMutex mMutex; - - sem_t mShouldStopSemaphore; sem_t mHasStoppedSemaphore; + + static void *rsthread_init(void*) ; + RsMutex mMutex; + pthread_t mTid; }; +class RsTickingThread: public RsThread +{ +public: + RsTickingThread(); -class RsQueueThread: public RsThread + void shutdown(); + void fullstop(); + void join() { fullstop() ; } // used for compatibility + + virtual void data_tick() =0; + +private: + virtual void runloop() ; /* called once the thread is started. Should be overloaded by subclasses. */ + + sem_t mShouldStopSemaphore; +}; + +class RsSingleJobThread: public RsThread +{ +public: + virtual void run() =0; + +protected: + virtual void runloop() { run(); } /* called once the thread is started. Should be overloaded by subclasses. */ +}; + +class RsQueueThread: public RsTickingThread { public: diff --git a/plugins/FeedReader/services/p3FeedReaderThread.cc b/plugins/FeedReader/services/p3FeedReaderThread.cc index 423bc7dd4..31cbeb536 100644 --- a/plugins/FeedReader/services/p3FeedReaderThread.cc +++ b/plugins/FeedReader/services/p3FeedReaderThread.cc @@ -37,7 +37,7 @@ enum FeedFormat { FORMAT_RSS, FORMAT_RDF, FORMAT_ATOM }; *********/ p3FeedReaderThread::p3FeedReaderThread(p3FeedReader *feedReader, Type type, const std::string &feedId) : - RsThread(), mFeedReader(feedReader), mType(type), mFeedId(feedId) + RsTickingThread(), mFeedReader(feedReader), mType(type), mFeedId(feedId) { } @@ -49,9 +49,8 @@ p3FeedReaderThread::~p3FeedReaderThread() /****************************** Thread *************************************/ /***************************************************************************/ -void p3FeedReaderThread::run() +void p3FeedReaderThread::data_tick() { - while (isRunning()) { #ifdef WIN32 Sleep(1000); #else @@ -148,7 +147,6 @@ void p3FeedReaderThread::run() } break; } - } } /***************************************************************************/ diff --git a/plugins/FeedReader/services/p3FeedReaderThread.h b/plugins/FeedReader/services/p3FeedReaderThread.h index e6c43f0dd..840e2e8f8 100644 --- a/plugins/FeedReader/services/p3FeedReaderThread.h +++ b/plugins/FeedReader/services/p3FeedReaderThread.h @@ -33,7 +33,7 @@ class RsFeedReaderMsg; class HTMLWrapper; class RsFeedReaderXPath; -class p3FeedReaderThread : public RsThread +class p3FeedReaderThread : public RsTickingThread { public: enum Type @@ -56,7 +56,7 @@ public: static RsFeedReaderErrorState processTransformation(const RsFeedReaderFeed &feed, RsFeedReaderMsg *msg, std::string &errorString); private: - virtual void run(); + virtual void data_tick(); RsFeedReaderErrorState download(const RsFeedReaderFeed &feed, std::string &content, std::string &icon, std::string &errorString); RsFeedReaderErrorState process(const RsFeedReaderFeed &feed, std::list &entries, std::string &errorString); diff --git a/retroshare-nogui/src/TerminalApiClient.cpp b/retroshare-nogui/src/TerminalApiClient.cpp index 76d299ed5..62f6ebb5e 100644 --- a/retroshare-nogui/src/TerminalApiClient.cpp +++ b/retroshare-nogui/src/TerminalApiClient.cpp @@ -93,7 +93,7 @@ TerminalApiClient::TerminalApiClient(ApiServer *api): TerminalApiClient::~TerminalApiClient() { - join(); + //join(); } void TerminalApiClient::run() diff --git a/retroshare-nogui/src/TerminalApiClient.h b/retroshare-nogui/src/TerminalApiClient.h index 6a8b1c654..0b4c2683d 100644 --- a/retroshare-nogui/src/TerminalApiClient.h +++ b/retroshare-nogui/src/TerminalApiClient.h @@ -8,7 +8,7 @@ namespace resource_api { // - account selection // - login // - shutdown -class TerminalApiClient: private RsThread{ +class TerminalApiClient: private RsSingleJobThread{ public: // zero setup: create an instance of this class and destroy it when not needed anymore // no need to call start or stop or something