diff --git a/libretroshare/src/pqi/pqithreadstreamer.cc b/libretroshare/src/pqi/pqithreadstreamer.cc index 6a84a2e0b..6745dc864 100644 --- a/libretroshare/src/pqi/pqithreadstreamer.cc +++ b/libretroshare/src/pqi/pqithreadstreamer.cc @@ -23,17 +23,17 @@ #include "pqi/pqithreadstreamer.h" #include -#define DEFAULT_STREAMER_TIMEOUT 10000 // 10 ms. -#define DEFAULT_STREAMER_SLEEP 1000 // 1 ms. +#define DEFAULT_STREAMER_TIMEOUT 10000 // 10 ms +#define DEFAULT_STREAMER_SLEEP 30000 // 30 ms #define DEFAULT_STREAMER_IDLE_SLEEP 1000000 // 1 sec -//#define PQISTREAMER_DEBUG +// #define PQISTREAMER_DEBUG pqithreadstreamer::pqithreadstreamer(PQInterface *parent, RsSerialiser *rss, const RsPeerId& id, BinInterface *bio_in, int bio_flags_in) :pqistreamer(rss, id, bio_in, bio_flags_in), mParent(parent), mTimeout(0), mThreadMutex("pqithreadstreamer") { - mTimeout = DEFAULT_STREAMER_TIMEOUT; - mSleepPeriod = DEFAULT_STREAMER_SLEEP; + mTimeout = DEFAULT_STREAMER_TIMEOUT; + mSleepPeriod = DEFAULT_STREAMER_SLEEP; } bool pqithreadstreamer::RecvItem(RsItem *item) @@ -43,9 +43,8 @@ bool pqithreadstreamer::RecvItem(RsItem *item) int pqithreadstreamer::tick() { -// pqithreadstreamer mutex lock is not needed here -// we are only checking if the connection is active, and if not active we will try to establish it -// RsStackMutex stack(mThreadMutex); + // pqithreadstreamer mutex lock is not needed here + // we will only check if the connection is active, and if not we will try to establish it tick_bio(); return 0; @@ -53,47 +52,50 @@ int pqithreadstreamer::tick() void pqithreadstreamer::threadTick() { - uint32_t recv_timeout = 0; - uint32_t sleep_period = 0; - bool isactive = false; - { - RsStackMutex stack(mStreamerMtx); - recv_timeout = mTimeout; - sleep_period = mSleepPeriod; - isactive = mBio->isactive(); - } + uint32_t recv_timeout = 0; + uint32_t sleep_period = 0; + bool isactive = false; + + { + RsStackMutex stack(mStreamerMtx); + recv_timeout = mTimeout; + sleep_period = mSleepPeriod; + isactive = mBio->isactive(); + } - updateRates() ; + // update the connection rates + updateRates() ; - if (!isactive) - { - rstime::rs_usleep(DEFAULT_STREAMER_IDLE_SLEEP); - return ; - } + // if the connection est not active, long sleep then return + if (!isactive) + { + rstime::rs_usleep(DEFAULT_STREAMER_IDLE_SLEEP); + return ; + } - { - RsStackMutex stack(mThreadMutex); - tick_recv(recv_timeout); - } + // fill incoming queue with items from SSL + { + RsStackMutex stack(mThreadMutex); + tick_recv(recv_timeout); + } - // Push Items, Outside of Mutex. - RsItem *incoming = NULL; - while((incoming = GetItem())) - { - RecvItem(incoming); - } + // move items to appropriate service queue or shortcut to fast service + RsItem *incoming = NULL; + while((incoming = GetItem())) + { + RecvItem(incoming); + } - { - RsStackMutex stack(mThreadMutex); - tick_send(0); - } + // parse the outgoing queue and send items to SSL + { + RsStackMutex stack(mThreadMutex); + tick_send(0); + } - if (sleep_period) - { - rstime::rs_usleep(sleep_period); - } + // sleep + if (sleep_period) + { + rstime::rs_usleep(sleep_period); + } } - - - diff --git a/libretroshare/src/rsserver/p3face-server.cc b/libretroshare/src/rsserver/p3face-server.cc index 0ed2f698c..9426b0471 100644 --- a/libretroshare/src/rsserver/p3face-server.cc +++ b/libretroshare/src/rsserver/p3face-server.cc @@ -143,13 +143,14 @@ void RsServer::threadTick() // if there is time left, we sleep double timeToSleep = mTickInterval - mAvgRunDuration; - if (timeToSleep > 0) - { +// never sleep less than 50 ms + if (timeToSleep < 0.050) + timeToSleep = 0.050; + #ifdef TICK_DEBUG - RsDbg() << "TICK_DEBUG will sleep " << timeToSleep << " ms" << std::endl; + RsDbg() << "TICK_DEBUG will sleep " << (int) (1000 * timeToSleep) << " ms" << std::endl; #endif - rstime::rs_usleep(timeToSleep * 1000000); - } + rstime::rs_usleep(timeToSleep * 1000000); double ts = getCurrentTS(); mLastts = ts; @@ -229,12 +230,16 @@ void RsServer::threadTick() // ticking is done, now compute new values of mLastRunDuration, mAvgRunDuration and mTickInterval ts = getCurrentTS(); mLastRunDuration = ts - mLastts; + +// low-pass filter and don't let mAvgRunDuration exceeds maxTickInterval mAvgRunDuration = 0.1 * mLastRunDuration + 0.9 * mAvgRunDuration; + if (mAvgRunDuration > maxTickInterval) + mAvgRunDuration = maxTickInterval; #ifdef TICK_DEBUG RsDbg() << "TICK_DEBUG new mLastRunDuration " << mLastRunDuration << " mAvgRunDuration " << mAvgRunDuration << std::endl; if (mLastRunDuration > WARN_BIG_CYCLE_TIME) - RsDbg() << "TICK_DEBUG excessively long lycle time " << mLastRunDuration << std::endl; + RsDbg() << "TICK_DEBUG excessively long cycle time " << mLastRunDuration << std::endl; #endif // if the core has returned that there is more to tick we decrease the ticking interval, else we increse it @@ -250,7 +255,7 @@ void RsServer::threadTick() RsDbg() << "TICK_DEBUG new tick interval " << mTickInterval << std::endl; #endif -// keep the tick interval within allowed limits +// keep the tick interval target within allowed limits if (mTickInterval < minTickInterval) mTickInterval = minTickInterval; else if (mTickInterval > maxTickInterval)