created 2 subclasses of RsThread, one for ticking services, and one for single shot jobs. Now all threads use the same base code.

git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@8288 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
csoler 2015-05-22 20:54:38 +00:00
parent f2d4a237ca
commit e9b9dce9f5
28 changed files with 317 additions and 335 deletions

View File

@ -43,8 +43,7 @@ RsControlModule::RsControlModule(int argc, char **argv, StateTokenServer* sts, A
RsControlModule::~RsControlModule() RsControlModule::~RsControlModule()
{ {
if(isRunning()) // join();
join();
} }
bool RsControlModule::processShouldExit() bool RsControlModule::processShouldExit()

View File

@ -19,8 +19,7 @@ class ApiServer;
// - handle password callback // - handle password callback
// - confirm plugin loading // - confirm plugin loading
// - shutdown retroshare // - shutdown retroshare
class RsControlModule: public ResourceRouter, NotifyClient, class RsControlModule: public ResourceRouter, NotifyClient, private RsSingleJobThread
private RsThread
{ {
public: public:
// ApiServer will be called once RS is started, to load additional api modules // ApiServer will be called once RS is started, to load additional api modules

View File

@ -61,8 +61,8 @@ FileIndexMonitor::FileIndexMonitor(CacheStrapper *cs, std::string cachedir, cons
mForceCheck(false), mInCheck(false), hashCache(config_dir+"/" + "file_cache"),useHashCache(true) mForceCheck(false), mInCheck(false), hashCache(config_dir+"/" + "file_cache"),useHashCache(true)
{ {
updatePeriod = 15 * 60; // 15 minutes updatePeriod = 15 * 60; // 15 minutes
reference_time = 0 ; reference_time = 0 ;
} }
bool FileIndexMonitor::autoCheckEnabled() const bool FileIndexMonitor::autoCheckEnabled() const
@ -622,15 +622,13 @@ void FileIndexMonitor::setPeriod(int period)
#endif #endif
} }
void FileIndexMonitor::run() void FileIndexMonitor::data_tick()
{ {
if(autoCheckEnabled()) if(autoCheckEnabled())
updateCycle(); updateCycle();
while(isRunning()) int i=0 ;
{ for(;;++i)
int i=0 ;
for(;;++i)
{ {
if(!isRunning()) if(!isRunning())
return; return;
@ -651,7 +649,6 @@ void FileIndexMonitor::run()
if(i < abs(updatePeriod) || autoCheckEnabled()) if(i < abs(updatePeriod) || autoCheckEnabled())
updateCycle(); updateCycle();
}
} }
void FileIndexMonitor::updateCycle() void FileIndexMonitor::updateCycle()

View File

@ -103,7 +103,7 @@ class HashCache
* FileIndexMonitor * FileIndexMonitor
*****************************************************************************************/ *****************************************************************************************/
class FileIndexMonitor: public CacheSource, public RsThread class FileIndexMonitor: public CacheSource, public RsTickingThread
{ {
public: public:
FileIndexMonitor(CacheStrapper *cs, std::string cachedir, const RsPeerId& pid, const std::string& config_dir); FileIndexMonitor(CacheStrapper *cs, std::string cachedir, const RsPeerId& pid, const std::string& config_dir);
@ -131,7 +131,7 @@ class FileIndexMonitor: public CacheSource, public RsThread
/* the FileIndexMonitor inner workings */ /* the FileIndexMonitor inner workings */
//virtual void run(std::string& currentJob); /* overloaded from RsThread */ //virtual void run(std::string& currentJob); /* overloaded from RsThread */
//void updateCycle(std::string& currentJob); //void updateCycle(std::string& currentJob);
virtual void run(); /* overloaded from RsThread */ virtual void data_tick(); /* overloaded from RsThread */
void updateCycle(); void updateCycle();
// Interface for browsing dir hirarchy // Interface for browsing dir hirarchy

View File

@ -115,6 +115,7 @@ ftController::ftController(CacheStrapper *cs, ftDataMultiplex *dm, p3ServiceCont
_max_active_downloads = 5 ; // default queue size _max_active_downloads = 5 ; // default queue size
_min_prioritized_transfers = 3 ; _min_prioritized_transfers = 3 ;
/* TODO */ /* TODO */
cnt = 0 ;
} }
void ftController::setTurtleRouter(p3turtle *pt) { mTurtle = pt ; } void ftController::setTurtleRouter(p3turtle *pt) { mTurtle = pt ; }
@ -209,14 +210,10 @@ void ftController::removeFileSource(const RsFileHash& hash,const RsPeerId& peer_
std::cerr << "... not added: hash not found." << std::endl ; std::cerr << "... not added: hash not found." << std::endl ;
#endif #endif
} }
void ftController::run() void ftController::data_tick()
{ {
/* check the queues */ /* check the queues */
uint32_t cnt = 0 ;
while(isRunning())
{
//Waiting 1 sec before start //Waiting 1 sec before start
usleep(1*1000*1000); // 1 sec usleep(1*1000*1000); // 1 sec
@ -276,8 +273,6 @@ void ftController::run()
if(cnt++ % 10 == 0) if(cnt++ % 10 == 0)
checkDownloadQueue() ; checkDownloadQueue() ;
}
} }
void ftController::searchForDirectSources() void ftController::searchForDirectSources()

View File

@ -113,7 +113,7 @@ class ftPendingRequest
}; };
class ftController: public CacheTransfer, public RsThread, public pqiServiceMonitor, public p3Config class ftController: public CacheTransfer, public RsTickingThread, public pqiServiceMonitor, public p3Config
{ {
public: public:
@ -126,7 +126,7 @@ class ftController: public CacheTransfer, public RsThread, public pqiServiceMoni
bool activate(); bool activate();
bool isActiveAndNoPending(); bool isActiveAndNoPending();
virtual void run(); virtual void data_tick();
/***************************************************************/ /***************************************************************/
/********************** Controller Access **********************/ /********************** Controller Access **********************/
@ -243,6 +243,7 @@ class ftController: public CacheTransfer, public RsThread, public pqiServiceMoni
p3ServiceControl *mServiceCtrl; p3ServiceControl *mServiceCtrl;
uint32_t mFtServiceId; uint32_t mFtServiceId;
uint32_t cnt ;
RsMutex ctrlMutex; RsMutex ctrlMutex;
std::map<RsFileHash, ftFileControl*> mCompleted; std::map<RsFileHash, ftFileControl*> mCompleted;

View File

@ -43,60 +43,55 @@
ftExtraList::ftExtraList() ftExtraList::ftExtraList()
:p3Config(), extMutex("p3Config") :p3Config(), extMutex("p3Config")
{ {
return; cleanup = 0;
return;
} }
void ftExtraList::run() void ftExtraList::data_tick()
{ {
bool todo = false; bool todo = false;
time_t cleanup = 0; time_t now = time(NULL);
time_t now = 0;
while (isRunning())
{
#ifdef DEBUG_ELIST #ifdef DEBUG_ELIST
//std::cerr << "ftExtraList::run() Iteration"; //std::cerr << "ftExtraList::run() Iteration";
//std::cerr << std::endl; //std::cerr << std::endl;
#endif #endif
now = time(NULL); {
RsStackMutex stack(extMutex);
{ todo = (mToHash.size() > 0);
RsStackMutex stack(extMutex); }
todo = (mToHash.size() > 0); if (todo)
} {
/* Hash a file */
if (todo) hashAFile();
{
/* Hash a file */
hashAFile();
#ifdef WIN32 #ifdef WIN32
Sleep(1); Sleep(1);
#else #else
/* microsleep */ /* microsleep */
usleep(10); usleep(10);
#endif #endif
} }
else else
{ {
/* cleanup */ /* cleanup */
if (cleanup < now) if (cleanup < now)
{ {
cleanupOldFiles(); cleanupOldFiles();
cleanup = now + CLEANUP_PERIOD; cleanup = now + CLEANUP_PERIOD;
} }
/* sleep */ /* sleep */
#ifdef WIN32 #ifdef WIN32
Sleep(1000); Sleep(1000);
#else #else
sleep(1); sleep(1);
#endif #endif
} }
}
} }

View File

@ -106,7 +106,7 @@ const uint32_t FT_DETAILS_REMOTE = 0x0002;
const uint32_t CLEANUP_PERIOD = 600; /* 10 minutes */ const uint32_t CLEANUP_PERIOD = 600; /* 10 minutes */
class ftExtraList: public RsThread, public p3Config, public ftSearch class ftExtraList: public RsTickingThread, public p3Config, public ftSearch
{ {
public: public:
@ -143,7 +143,7 @@ virtual bool search(const RsFileHash &hash, FileSearchFlags hintflags, FileIn
/*** /***
* Thread Main Loop * Thread Main Loop
**/ **/
virtual void run(); virtual void data_tick();
/*** /***
* Configuration - store extra files. * Configuration - store extra files.
@ -167,6 +167,8 @@ bool cleanupEntry(std::string path, TransferRequestFlags flags);
std::map<std::string, RsFileHash> mHashedList; /* path -> hash ( not saved ) */ std::map<std::string, RsFileHash> mHashedList; /* path -> hash ( not saved ) */
std::map<RsFileHash, FileDetails> mFiles; std::map<RsFileHash, FileDetails> mFiles;
time_t cleanup ;
}; };

View File

@ -542,13 +542,13 @@ bool ftTransferModule::isCheckingHash()
return mFlag == FT_TM_FLAG_CHECKING || mFlag == FT_TM_FLAG_CHUNK_CRC; return mFlag == FT_TM_FLAG_CHECKING || mFlag == FT_TM_FLAG_CHUNK_CRC;
} }
class HashThread: public RsThread class HashThread: public RsSingleJobThread
{ {
public: public:
HashThread(ftFileCreator *m) HashThread(ftFileCreator *m)
: _hashThreadMtx("HashThread"), _m(m),_finished(false),_hash("") {} : _hashThreadMtx("HashThread"), _m(m),_finished(false),_hash("") {}
virtual void run() virtual void run()
{ {
#ifdef FT_DEBUG #ifdef FT_DEBUG
std::cerr << "hash thread is running for file " << std::endl; std::cerr << "hash thread is running for file " << std::endl;
@ -609,8 +609,6 @@ bool ftTransferModule::checkFile()
RsFileHash check_hash( _hash_thread->hash() ) ; RsFileHash check_hash( _hash_thread->hash() ) ;
_hash_thread->join(); // allow releasing of resources when finished.
delete _hash_thread ; delete _hash_thread ;
_hash_thread = NULL ; _hash_thread = NULL ;

View File

@ -146,16 +146,13 @@ RsGenExchange::~RsGenExchange()
} }
void RsGenExchange::run() void RsGenExchange::data_tick()
{ {
double timeDelta = 0.1; // slow tick in sec static const double timeDelta = 0.1; // slow tick in sec
while(isRunning()) {
tick(); tick();
usleep((int) (timeDelta * 1000 *1000)); // timeDelta sec
usleep((int) (timeDelta * 1000 *1000)); // timeDelta sec
}//while(isRunning())
} }
void RsGenExchange::tick() void RsGenExchange::tick()
@ -238,7 +235,6 @@ void RsGenExchange::tick()
mNotifications.push_back(c); mNotifications.push_back(c);
} }
mIntegrityCheck->join();
delete mIntegrityCheck; delete mIntegrityCheck;
mIntegrityCheck = NULL; mIntegrityCheck = NULL;
mLastCheck = time(NULL); mLastCheck = time(NULL);

View File

@ -109,7 +109,7 @@ typedef std::map<RsGxsGrpMsgIdPair, std::vector<RsGxsMsgItem*> > GxsMsgRelatedDa
class RsGixs; class RsGixs;
class RsGenExchange : public RsNxsObserver, public RsThread, public RsGxsIface class RsGenExchange : public RsNxsObserver, public RsTickingThread, public RsGxsIface
{ {
public: public:
@ -182,7 +182,7 @@ public:
*/ */
RsTokenService* getTokenService(); RsTokenService* getTokenService();
void run(); virtual void data_tick();
/*! /*!
* Policy bit pattern portion * Policy bit pattern portion

View File

@ -73,6 +73,7 @@ RsGxsNetService::RsGxsNetService(uint16_t servType, RsGeneralDataService *gds,
{ {
addSerialType(new RsNxsSerialiser(mServType)); addSerialType(new RsNxsSerialiser(mServType));
mOwnId = mNetMgr->getOwnId(); mOwnId = mNetMgr->getOwnId();
mUpdateCounter = 0;
} }
RsGxsNetService::~RsGxsNetService() RsGxsNetService::~RsGxsNetService()
@ -1149,23 +1150,20 @@ bool RsGxsNetService::locked_processTransac(RsNxsTransac* item)
return false; return false;
} }
void RsGxsNetService::run() void RsGxsNetService::data_tick()
{ {
double timeDelta = 0.5; static const double timeDelta = 0.5;
int updateCounter = 0;
while(isRunning())
{
//Start waiting as nothing to do in runup //Start waiting as nothing to do in runup
usleep((int) (timeDelta * 1000 * 1000)); // timeDelta sec usleep((int) (timeDelta * 1000 * 1000)); // timeDelta sec
if(updateCounter >= 20) if(mUpdateCounter >= 20)
{ {
updateServerSyncTS(); updateServerSyncTS();
updateCounter = 0; mUpdateCounter = 0;
} }
else else
updateCounter++; mUpdateCounter++;
// process active transactions // process active transactions
processTransactions(); processTransactions();
@ -1177,8 +1175,6 @@ void RsGxsNetService::run()
runVetting(); runVetting();
processExplicitGroupRequests(); processExplicitGroupRequests();
}
} }
void RsGxsNetService::updateServerSyncTS() void RsGxsNetService::updateServerSyncTS()

View File

@ -168,7 +168,7 @@ public:
/*! /*!
* Processes transactions and job queue * Processes transactions and job queue
*/ */
void run(); virtual void data_tick();
private: private:
/*! /*!
@ -477,6 +477,7 @@ private:
uint32_t mLastKeyPublishTs; uint32_t mLastKeyPublishTs;
const uint32_t mSYNC_PERIOD; const uint32_t mSYNC_PERIOD;
int mUpdateCounter ;
RsGcxs* mCircles; RsGcxs* mCircles;
RsGixsReputation* mReputations; RsGixsReputation* mReputations;

View File

@ -61,7 +61,7 @@ inline RsGxsGrpMsgIdPair getMsgIdPair(RsGxsMsgItem& msg)
* Does message clean up based on individual group expirations first * Does message clean up based on individual group expirations first
* if avialable. If not then deletion s * if avialable. If not then deletion s
*/ */
class RsGxsMessageCleanUp : public RsThread class RsGxsMessageCleanUp //: public RsThread
{ {
public: public:
@ -84,7 +84,7 @@ public:
/*! /*!
* TODO: Rather than manual progressions consider running through a thread * TODO: Rather than manual progressions consider running through a thread
*/ */
void run(){} //virtual void data_tick(){}
private: private:
@ -97,7 +97,7 @@ private:
* Checks the integrity message and groups * Checks the integrity message and groups
* in rsDataService using computed hash * in rsDataService using computed hash
*/ */
class RsGxsIntegrityCheck : public RsThread class RsGxsIntegrityCheck : public RsSingleJobThread
{ {
enum CheckState { CheckStart, CheckChecking }; enum CheckState { CheckStart, CheckChecking };

View File

@ -130,6 +130,7 @@ AuthGPG::AuthGPG(const std::string& path_to_public_keyring,const std::string& pa
{ {
_force_sync_database = false ; _force_sync_database = false ;
start(); start();
int mCount = 0;
} }
/* This function is called when retroshare is first started /* This function is called when retroshare is first started
@ -178,30 +179,26 @@ int AuthGPG::GPGInit(const RsPgpId &ownId)
{ {
} }
void AuthGPG::run() void AuthGPG::data_tick()
{ {
int count = 0; usleep(100 * 1000); //100 msec
while (isRunning()) { /// every 100 milliseconds
usleep(100 * 1000); //100 msec processServices();
/// every 100 milliseconds /// every ten seconds
processServices(); if (++mCount >= 100 || _force_sync_database) {
RsStackMutex stack(gpgMtxService); ///******* LOCKED ******
/// every ten seconds /// The call does multiple things at once:
if (++count >= 100 || _force_sync_database) { /// - checks whether the keyring has changed in memory
RsStackMutex stack(gpgMtxService); ///******* LOCKED ****** /// - checks whether the keyring has changed on disk.
/// - merges/updates according to status.
/// The call does multiple things at once: ///
/// - checks whether the keyring has changed in memory PGPHandler::syncDatabase() ;
/// - checks whether the keyring has changed on disk. mCount = 0;
/// - merges/updates according to status. _force_sync_database = false ;
/// }//if (++count >= 100 || _force_sync_database)
PGPHandler::syncDatabase() ;
count = 0;
_force_sync_database = false ;
}//if (++count >= 100 || _force_sync_database)
}//while (isRunning())
} }
void AuthGPG::processServices() void AuthGPG::processServices()

View File

@ -96,7 +96,7 @@ public:
virtual void setGPGOperation(AuthGPGOperation *operation) = 0; virtual void setGPGOperation(AuthGPGOperation *operation) = 0;
}; };
class AuthGPG: public p3Config, public RsThread, public PGPHandler class AuthGPG: public p3Config, public RsTickingThread, public PGPHandler
{ {
public: public:
@ -274,7 +274,7 @@ class AuthGPG: public p3Config, public RsThread, public PGPHandler
bool printOwnKeys_locked(); bool printOwnKeys_locked();
/* own thread */ /* own thread */
virtual void run(); virtual void data_tick();
private: private:
@ -295,7 +295,8 @@ class AuthGPG: public p3Config, public RsThread, public PGPHandler
RsPgpId mOwnGpgId; RsPgpId mOwnGpgId;
bool gpgKeySelected; bool gpgKeySelected;
bool _force_sync_database ; bool _force_sync_database ;
uint32_t mCount ;
std::list<AuthGPGService*> services ; std::list<AuthGPGService*> services ;

View File

@ -53,7 +53,7 @@ int pqithreadstreamer::tick()
return 0; return 0;
} }
int pqithreadstreamer::data_tick() void pqithreadstreamer::data_tick()
{ {
uint32_t recv_timeout = 0; uint32_t recv_timeout = 0;
uint32_t sleep_period = 0; uint32_t sleep_period = 0;
@ -68,7 +68,7 @@ int pqithreadstreamer::data_tick()
if (!isactive) if (!isactive)
{ {
usleep(DEFAULT_STREAMER_IDLE_SLEEP); usleep(DEFAULT_STREAMER_IDLE_SLEEP);
return 0; return ;
} }
{ {
@ -92,7 +92,6 @@ int pqithreadstreamer::data_tick()
{ {
usleep(sleep_period); usleep(sleep_period);
} }
return 1;
} }

View File

@ -29,27 +29,25 @@
#include "pqi/pqistreamer.h" #include "pqi/pqistreamer.h"
#include "util/rsthreads.h" #include "util/rsthreads.h"
class pqithreadstreamer: public pqistreamer, public RsThread class pqithreadstreamer: public pqistreamer, public RsTickingThread
{ {
public: public:
pqithreadstreamer(PQInterface *parent, RsSerialiser *rss, const RsPeerId& peerid, BinInterface *bio_in, int bio_flagsin); pqithreadstreamer(PQInterface *parent, RsSerialiser *rss, const RsPeerId& peerid, BinInterface *bio_in, int bio_flagsin);
virtual bool RecvItem(RsItem *item); // from pqistreamer
virtual int tick(); virtual bool RecvItem(RsItem *item);
virtual int tick();
protected: protected:
// from RsThread virtual void data_tick();
virtual void run();
int data_tick(); PQInterface *mParent;
uint32_t mTimeout;
PQInterface *mParent; uint32_t mSleepPeriod;
uint32_t mTimeout;
uint32_t mSleepPeriod;
private: private:
/* thread variables */ /* thread variables */
RsMutex mThreadMutex; RsMutex mThreadMutex;
}; };
#endif //MRK_PQI_THREAD_STREAMER_HEADER #endif //MRK_PQI_THREAD_STREAMER_HEADER

View File

@ -71,7 +71,7 @@ void RsServer::ConfigFinalSave()
mConfigMgr->completeConfiguration(); mConfigMgr->completeConfiguration();
} }
void RsServer::startServiceThread(RsThread *t) void RsServer::startServiceThread(RsTickingThread *t)
{ {
t->start() ; t->start() ;
mRegisteredServiceThreads.push_back(t) ; mRegisteredServiceThreads.push_back(t) ;
@ -87,13 +87,13 @@ void RsServer::rsGlobalShutDown()
mNetMgr->shutdown(); /* Handles UPnP */ mNetMgr->shutdown(); /* Handles UPnP */
join(); fullstop() ;
// kill all registered service threads // kill all registered service threads
for(std::list<RsThread*>::iterator it= mRegisteredServiceThreads.begin();it!=mRegisteredServiceThreads.end();++it) for(std::list<RsTickingThread*>::iterator it= mRegisteredServiceThreads.begin();it!=mRegisteredServiceThreads.end();++it)
{ {
(*it)->join() ; (*it)->fullstop() ;
} }
// #ifdef RS_ENABLE_GXS // #ifdef RS_ENABLE_GXS
// // We should automate this. // // We should automate this.

View File

@ -49,6 +49,25 @@ int rsserverzone = 101;
****/ ****/
#define WARN_BIG_CYCLE_TIME (0.2) #define WARN_BIG_CYCLE_TIME (0.2)
#ifdef WINDOWS_SYS
#include <time.h>
#include <sys/timeb.h>
#endif
static double getCurrentTS()
{
#ifndef WINDOWS_SYS
struct timeval cts_tmp;
gettimeofday(&cts_tmp, NULL);
double cts = (cts_tmp.tv_sec) + ((double) cts_tmp.tv_usec) / 1000000.0;
#else
struct _timeb timebuf;
_ftime( &timebuf);
double cts = (timebuf.time) + ((double) timebuf.millitm) / 1000.0;
#endif
return cts;
}
RsServer::RsServer() RsServer::RsServer()
@ -74,22 +93,20 @@ RsServer::RsServer()
chatSrv = NULL; chatSrv = NULL;
mStatusSrv = NULL; mStatusSrv = NULL;
/* caches (that need ticking) */ mMin = 0;
mLoop = 0;
mAvgTickRate = mTimeDelta;
mLastts = getCurrentTS();
mLastSec = 0; /* for the slower ticked stuff */
mTimeDelta = 0.25 ;
/* caches (that need ticking) */
/* Config */ /* Config */
mConfigMgr = NULL; mConfigMgr = NULL;
mGeneralConfig = NULL; mGeneralConfig = NULL;
/* GXS - Amazingly we can still initialise these
* even without knowing the data-types (they are just pointers???)
*/
// mPhoto = NULL;
// mWiki = NULL;
// mPosted = NULL;
// mGxsCircles = NULL;
// mGxsIdService = NULL;
// mGxsForums = NULL;
// mWire = NULL;
} }
RsServer::~RsServer() RsServer::~RsServer()
@ -101,182 +118,143 @@ RsServer::~RsServer()
----> MUST BE LOCKED! ----> MUST BE LOCKED!
*/ */
#ifdef WINDOWS_SYS
#include <time.h>
#include <sys/timeb.h>
#endif
static double getCurrentTS()
{
#ifndef WINDOWS_SYS
struct timeval cts_tmp;
gettimeofday(&cts_tmp, NULL);
double cts = (cts_tmp.tv_sec) + ((double) cts_tmp.tv_usec) / 1000000.0;
#else
struct _timeb timebuf;
_ftime( &timebuf);
double cts = (timebuf.time) + ((double) timebuf.millitm) / 1000.0;
#endif
return cts;
}
/* Thread Fn: Run the Core */ /* Thread Fn: Run the Core */
void RsServer::run() void RsServer::data_tick()
{ {
double timeDelta = 0.25;
double minTimeDelta = 0.1; // 25;
double maxTimeDelta = 0.5;
double kickLimit = 0.15;
double avgTickRate = timeDelta;
double lastts, ts;
lastts = ts = getCurrentTS();
long lastSec = 0; /* for the slower ticked stuff */
int min = 0;
int loop = 0;
while(isRunning())
{
#ifndef WINDOWS_SYS #ifndef WINDOWS_SYS
usleep((int) (timeDelta * 1000000)); usleep((int) (mTimeDelta * 1000000));
#else #else
Sleep((int) (timeDelta * 1000)); Sleep((int) (mTimeDelta * 1000));
#endif #endif
ts = getCurrentTS(); double ts = getCurrentTS();
double delta = ts - lastts; double delta = ts - mLastts;
/* for the fast ticked stuff */ /* for the fast ticked stuff */
if (delta > timeDelta) if (delta > mTimeDelta)
{ {
#ifdef DEBUG_TICK #ifdef DEBUG_TICK
std::cerr << "Delta: " << delta << std::endl; std::cerr << "Delta: " << delta << std::endl;
std::cerr << "Time Delta: " << timeDelta << std::endl; std::cerr << "Time Delta: " << mTimeDelta << std::endl;
std::cerr << "Avg Tick Rate: " << avgTickRate << std::endl; std::cerr << "Avg Tick Rate: " << mAvgTickRate << std::endl;
#endif #endif
lastts = ts; mLastts = ts;
/******************************** RUN SERVER *****************/ /******************************** RUN SERVER *****************/
lockRsCore(); lockRsCore();
int moreToTick = pqih->tick(); int moreToTick = pqih->tick();
#ifdef DEBUG_TICK #ifdef DEBUG_TICK
std::cerr << "RsServer::run() ftserver->tick(): moreToTick: " << moreToTick << std::endl; std::cerr << "RsServer::run() ftserver->tick(): moreToTick: " << moreToTick << std::endl;
#endif #endif
unlockRsCore(); unlockRsCore();
/* tick the Managers */ /* tick the Managers */
mPeerMgr->tick(); mPeerMgr->tick();
mLinkMgr->tick(); mLinkMgr->tick();
mNetMgr->tick(); mNetMgr->tick();
/******************************** RUN SERVER *****************/ /******************************** RUN SERVER *****************/
/* adjust tick rate depending on whether there is more. /* adjust tick rate depending on whether there is more.
*/ */
avgTickRate = 0.2 * timeDelta + 0.8 * avgTickRate; mAvgTickRate = 0.2 * mTimeDelta + 0.8 * mAvgTickRate;
if (1 == moreToTick) if (1 == moreToTick)
{ {
timeDelta = 0.9 * avgTickRate; mTimeDelta = 0.9 * mAvgTickRate;
if (timeDelta > kickLimit) if (mTimeDelta > kickLimit)
{ {
/* force next tick in one sec /* force next tick in one sec
* if we are reading data. * if we are reading data.
*/ */
timeDelta = kickLimit; mTimeDelta = kickLimit;
avgTickRate = kickLimit; mAvgTickRate = kickLimit;
} }
} }
else else
{ {
timeDelta = 1.1 * avgTickRate; mTimeDelta = 1.1 * mAvgTickRate;
} }
/* limiter */ /* limiter */
if (timeDelta < minTimeDelta) if (mTimeDelta < minTimeDelta)
{ {
timeDelta = minTimeDelta; mTimeDelta = minTimeDelta;
} }
else if (timeDelta > maxTimeDelta) else if (mTimeDelta > maxTimeDelta)
{ {
timeDelta = maxTimeDelta; mTimeDelta = maxTimeDelta;
} }
/* Fast Updates */ /* Fast Updates */
/* now we have the slow ticking stuff */ /* now we have the slow ticking stuff */
/* stuff ticked once a second (but can be slowed down) */ /* stuff ticked once a second (but can be slowed down) */
if ((int) ts > lastSec) if ((int) ts > mLastSec)
{ {
lastSec = (int) ts; mLastSec = (int) ts;
// Every second! (UDP keepalive). // Every second! (UDP keepalive).
//tou_tick_stunkeepalive(); //tou_tick_stunkeepalive();
// every five loops (> 5 secs) // every five loops (> 5 secs)
if (loop % 5 == 0) if (mLoop % 5 == 0)
{ {
// update_quick_stats(); // update_quick_stats();
// Update All Every 5 Seconds. // Update All Every 5 Seconds.
// These Update Functions do the locking themselves. // These Update Functions do the locking themselves.
#ifdef DEBUG_TICK #ifdef DEBUG_TICK
std::cerr << "RsServer::run() Updates()" << std::endl; std::cerr << "RsServer::run() Updates()" << std::endl;
#endif #endif
mConfigMgr->tick(); /* saves stuff */ mConfigMgr->tick(); /* saves stuff */
} }
// every 60 loops (> 1 min) // every 60 loops (> 1 min)
if (++loop >= 60) if (++mLoop >= 60)
{ {
loop = 0; mLoop = 0;
/* force saving FileTransferStatus TODO */ /* force saving FileTransferStatus TODO */
//ftserver->saveFileTransferStatus(); //ftserver->saveFileTransferStatus();
/* see if we need to resave certs */ /* see if we need to resave certs */
//AuthSSL::getAuthSSL()->CheckSaveCertificates(); //AuthSSL::getAuthSSL()->CheckSaveCertificates();
/* hour loop */ /* hour loop */
if (++min >= 60) if (++mMin >= 60)
{ {
min = 0; mMin = 0;
} }
} }
/* Tick slow services */ /* Tick slow services */
if(rsPlugins) if(rsPlugins)
rsPlugins->slowTickPlugins((time_t)ts); rsPlugins->slowTickPlugins((time_t)ts);
// slow update tick as well. // slow update tick as well.
// update(); // update();
} // end of slow tick. } // end of slow tick.
} // end of only once a second. } // end of only once a second.
double endCycleTs = getCurrentTS(); double endCycleTs = getCurrentTS();
double cycleTime = endCycleTs - ts; double cycleTime = endCycleTs - ts;
if (cycleTime > WARN_BIG_CYCLE_TIME) if (cycleTime > WARN_BIG_CYCLE_TIME)
{ {
std::string out; std::string out;
rs_sprintf(out, "RsServer::run() WARNING Excessively Long Cycle Time: %g secs => Please DEBUG", cycleTime); rs_sprintf(out, "RsServer::run() WARNING Excessively Long Cycle Time: %g secs => Please DEBUG", cycleTime);
std::cerr << out << std::endl; std::cerr << out << std::endl;
rslog(RSL_ALERT, rsserverzone, out); rslog(RSL_ALERT, rsserverzone, out);
} }
}
return;
} }

View File

@ -74,7 +74,7 @@ class RsPluginManager;
//int InitRetroShare(int argc, char **argv, RsInit *config); //int InitRetroShare(int argc, char **argv, RsInit *config);
//int LoadCertificates(RsInit *config); //int LoadCertificates(RsInit *config);
class RsServer: public RsControl, public RsThread class RsServer: public RsControl, public RsTickingThread
{ {
public: public:
/****************************************/ /****************************************/
@ -88,7 +88,7 @@ class RsServer: public RsControl, public RsThread
virtual ~RsServer(); virtual ~RsServer();
/* Thread Fn: Run the Core */ /* Thread Fn: Run the Core */
virtual void run(); virtual void data_tick();
/* locking stuff */ /* locking stuff */
void lockRsCore() void lockRsCore()
@ -120,7 +120,7 @@ class RsServer: public RsControl, public RsThread
/* Config */ /* Config */
virtual void ConfigFinalSave( ); virtual void ConfigFinalSave( );
virtual void startServiceThread(RsThread *t) ; virtual void startServiceThread(RsTickingThread *t) ;
/************* Rs shut down function: in upnp 'port lease time' bug *****************/ /************* Rs shut down function: in upnp 'port lease time' bug *****************/
@ -162,7 +162,7 @@ class RsServer: public RsControl, public RsThread
// This list contains all threaded services. It will be used to shut them down properly. // This list contains all threaded services. It will be used to shut them down properly.
std::list<RsThread*> mRegisteredServiceThreads ; std::list<RsTickingThread*> mRegisteredServiceThreads ;
/* GXS */ /* GXS */
// p3Wiki *mWiki; // p3Wiki *mWiki;
@ -184,6 +184,16 @@ class RsServer: public RsControl, public RsThread
// Worker Data..... // Worker Data.....
int mMin ;
int mLoop ;
int mLastts ;
long mLastSec ;
double mAvgTickRate ;
double mTimeDelta ;
static const double minTimeDelta = 0.1; // 25;
static const double maxTimeDelta = 0.5;
static const double kickLimit = 0.15;
}; };
/* Helper function to convert windows paths /* Helper function to convert windows paths

View File

@ -157,7 +157,7 @@ virtual bool send(RsRawItem *item)
}; };
class p3ThreadedService: public p3Service, public RsThread class p3ThreadedService: public p3Service, public RsTickingThread
{ {
protected: protected:

View File

@ -56,11 +56,33 @@ void *RsThread::rsthread_init(void* p)
{ {
return NULL; return NULL;
} }
thread -> run(); // tell the OS to free the thread resources when this function exits
// it is a replacement for pthread_join()
pthread_detach(pthread_self());
thread -> runloop();
return NULL; return NULL;
} }
RsThread::RsThread () : mMutex("RsThread")
{
sem_init(&mHasStoppedSemaphore,0,1) ;
void RsThread::shutdown() #ifdef WINDOWS_SYS
memset (&mTid, 0, sizeof(mTid));
#else
mTid = 0;
#endif
}
bool RsThread::isRunning()
{
// do we need a mutex for this ?
int sval =0;
sem_getvalue(&mHasStoppedSemaphore,&sval) ;
return !sval ;
}
void RsTickingThread::shutdown()
{ {
#ifdef DEBUG_THREADS #ifdef DEBUG_THREADS
std::cerr << "pqithreadstreamer::stop()" << std::endl; std::cerr << "pqithreadstreamer::stop()" << std::endl;
@ -83,7 +105,7 @@ void RsThread::shutdown()
sem_post(&mShouldStopSemaphore) ; sem_post(&mShouldStopSemaphore) ;
} }
void RsThread::fullstop() void RsTickingThread::fullstop()
{ {
shutdown() ; shutdown() ;
@ -107,7 +129,6 @@ void RsThread::start()
std::cerr << " initing should_stop=0" << std::endl; std::cerr << " initing should_stop=0" << std::endl;
std::cerr << " initing has_stopped=1" << std::endl; std::cerr << " initing has_stopped=1" << std::endl;
#endif #endif
sem_init(&mShouldStopSemaphore,0,0) ;
sem_init(&mHasStoppedSemaphore,0,0) ; sem_init(&mHasStoppedSemaphore,0,0) ;
int err ; int err ;
@ -125,37 +146,19 @@ void RsThread::start()
} }
RsThread::RsThread () : mMutex("RsThread")
RsTickingThread::RsTickingThread ()
{ {
sem_init(&mShouldStopSemaphore,0,0) ; sem_init(&mShouldStopSemaphore,0,0) ;
sem_init(&mHasStoppedSemaphore,0,1) ;
#ifdef WINDOWS_SYS
memset (&mTid, 0, sizeof(mTid));
#else
mTid = 0;
#endif
} }
bool RsThread::isRunning() void RsTickingThread::runloop()
{
// do we need a mutex for this ?
int sval =0;
sem_getvalue(&mHasStoppedSemaphore,&sval) ;
return !sval ;
}
void RsThread::run()
{ {
#ifdef DEBUG_THREADS #ifdef DEBUG_THREADS
std::cerr << "pqithreadstream::run()"; std::cerr << "pqithreadstream::run()";
std::cerr << std::endl; std::cerr << std::endl;
#endif #endif
sem_init(&mShouldStopSemaphore,0,0) ;
// tell the OS to free the thread resources when this function exits
// it is a replacement for pthread_join()
pthread_detach(pthread_self());
while(1) while(1)
{ {

View File

@ -177,31 +177,50 @@ pthread_t createThread(RsThread &thread);
class RsThread class RsThread
{ {
public: public:
RsThread(); RsThread();
virtual ~RsThread() {} virtual ~RsThread() {}
void start() ; void start() ;
void shutdown();
void fullstop();
void join() { fullstop() ; } // used for compatibility
bool isRunning(); bool isRunning();
protected: protected:
void run() ; /* called once the thread is started. Should be overloaded by subclasses. */ virtual void runloop() =0; /* called once the thread is started. Should be overloaded by subclasses. */
private:
static void *rsthread_init(void*) ;
pthread_t mTid;
RsMutex mMutex;
sem_t mShouldStopSemaphore;
sem_t mHasStoppedSemaphore; sem_t mHasStoppedSemaphore;
static void *rsthread_init(void*) ;
RsMutex mMutex;
pthread_t mTid;
}; };
class RsTickingThread: public RsThread
{
public:
RsTickingThread();
class RsQueueThread: public RsThread void shutdown();
void fullstop();
void join() { fullstop() ; } // used for compatibility
virtual void data_tick() =0;
private:
virtual void runloop() ; /* called once the thread is started. Should be overloaded by subclasses. */
sem_t mShouldStopSemaphore;
};
class RsSingleJobThread: public RsThread
{
public:
virtual void run() =0;
protected:
virtual void runloop() { run(); } /* called once the thread is started. Should be overloaded by subclasses. */
};
class RsQueueThread: public RsTickingThread
{ {
public: public:

View File

@ -37,7 +37,7 @@ enum FeedFormat { FORMAT_RSS, FORMAT_RDF, FORMAT_ATOM };
*********/ *********/
p3FeedReaderThread::p3FeedReaderThread(p3FeedReader *feedReader, Type type, const std::string &feedId) : p3FeedReaderThread::p3FeedReaderThread(p3FeedReader *feedReader, Type type, const std::string &feedId) :
RsThread(), mFeedReader(feedReader), mType(type), mFeedId(feedId) RsTickingThread(), mFeedReader(feedReader), mType(type), mFeedId(feedId)
{ {
} }
@ -49,9 +49,8 @@ p3FeedReaderThread::~p3FeedReaderThread()
/****************************** Thread *************************************/ /****************************** Thread *************************************/
/***************************************************************************/ /***************************************************************************/
void p3FeedReaderThread::run() void p3FeedReaderThread::data_tick()
{ {
while (isRunning()) {
#ifdef WIN32 #ifdef WIN32
Sleep(1000); Sleep(1000);
#else #else
@ -148,7 +147,6 @@ void p3FeedReaderThread::run()
} }
break; break;
} }
}
} }
/***************************************************************************/ /***************************************************************************/

View File

@ -33,7 +33,7 @@ class RsFeedReaderMsg;
class HTMLWrapper; class HTMLWrapper;
class RsFeedReaderXPath; class RsFeedReaderXPath;
class p3FeedReaderThread : public RsThread class p3FeedReaderThread : public RsTickingThread
{ {
public: public:
enum Type enum Type
@ -56,7 +56,7 @@ public:
static RsFeedReaderErrorState processTransformation(const RsFeedReaderFeed &feed, RsFeedReaderMsg *msg, std::string &errorString); static RsFeedReaderErrorState processTransformation(const RsFeedReaderFeed &feed, RsFeedReaderMsg *msg, std::string &errorString);
private: private:
virtual void run(); virtual void data_tick();
RsFeedReaderErrorState download(const RsFeedReaderFeed &feed, std::string &content, std::string &icon, std::string &errorString); RsFeedReaderErrorState download(const RsFeedReaderFeed &feed, std::string &content, std::string &icon, std::string &errorString);
RsFeedReaderErrorState process(const RsFeedReaderFeed &feed, std::list<RsFeedReaderMsg*> &entries, std::string &errorString); RsFeedReaderErrorState process(const RsFeedReaderFeed &feed, std::list<RsFeedReaderMsg*> &entries, std::string &errorString);

View File

@ -93,7 +93,7 @@ TerminalApiClient::TerminalApiClient(ApiServer *api):
TerminalApiClient::~TerminalApiClient() TerminalApiClient::~TerminalApiClient()
{ {
join(); //join();
} }
void TerminalApiClient::run() void TerminalApiClient::run()

View File

@ -8,7 +8,7 @@ namespace resource_api {
// - account selection // - account selection
// - login // - login
// - shutdown // - shutdown
class TerminalApiClient: private RsThread{ class TerminalApiClient: private RsSingleJobThread{
public: public:
// zero setup: create an instance of this class and destroy it when not needed anymore // zero setup: create an instance of this class and destroy it when not needed anymore
// no need to call start or stop or something // no need to call start or stop or something