diff --git a/libretroshare/src/pqi/pqistreamer.cc b/libretroshare/src/pqi/pqistreamer.cc index 76f5b1581..9694f9ce3 100644 --- a/libretroshare/src/pqi/pqistreamer.cc +++ b/libretroshare/src/pqi/pqistreamer.cc @@ -48,8 +48,10 @@ static struct RsLog::logInfo pqistreamerzoneInfo = {RsLog::Default, "pqistreamer #define pqistreamerzone &pqistreamerzoneInfo static const int PQISTREAM_ABS_MAX = 100000000; /* 100 MB/sec (actually per loop) */ -static const int PQISTREAM_AVG_PERIOD = 5; // update speed estimate every 5 seconds +static const int PQISTREAM_AVG_PERIOD = 1; // update speed estimate every second static const float PQISTREAM_AVG_FRAC = 0.8; // for bandpass filter over speed estimate. +static const float PQISTREAM_AVG_DT_FRAC = 0.99; // for low pass filter over elapsed time + static const int PQISTREAM_OPTIMAL_PACKET_SIZE = 512; // It is believed that this value should be lower than TCP slices and large enough as compare to encryption padding. // most importantly, it should be constant, so as to allow correct QoS. static const int PQISTREAM_SLICE_FLAG_STARTS = 0x01; // @@ -100,7 +102,8 @@ pqistreamer::pqistreamer(RsSerialiser *rss, const RsPeerId& id, BinInterface *bi mPkt_wpending(NULL), mPkt_wpending_size(0), mTotalRead(0), mTotalSent(0), mCurrRead(0), mCurrSent(0), - mAvgReadCount(0), mAvgSentCount(0) + mAvgReadCount(0), mAvgSentCount(0), + mAvgDtOut(0), mAvgDtIn(0) { // 100 B/s (minimal) @@ -114,8 +117,7 @@ pqistreamer::pqistreamer(RsSerialiser *rss, const RsPeerId& id, BinInterface *bi mAcceptsPacketSlicing = false ; // by default. Will be turned into true when everyone's ready. mLastSentPacketSlicingProbe = 0 ; - mAvgLastUpdate = time(NULL); - mCurrSentTS = mCurrReadTS = getCurrentTS(); + mAvgLastUpdate = mCurrSentTS = mCurrReadTS = getCurrentTS(); mIncomingSize = 0 ; @@ -231,47 +233,46 @@ void pqistreamer::setRate(bool b,float f) void pqistreamer::updateRates() { - // now update rates both ways. + // update rates both ways. - time_t t = time(NULL); // get current timestep. - int64_t diff ; + double t = getCurrentTS(); // get current timestamp. + double diff ; - { - RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/ + { + RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/ + diff = t - mAvgLastUpdate ; + } - diff = int64_t(t) - int64_t(mAvgLastUpdate) ; - } - - if (diff > PQISTREAM_AVG_PERIOD) - { - float avgReadpSec = getRate(true ) * PQISTREAM_AVG_FRAC + (1.0 - PQISTREAM_AVG_FRAC) * mAvgReadCount/(1024.0 * float(diff)); - float avgSentpSec = getRate(false) * PQISTREAM_AVG_FRAC + (1.0 - PQISTREAM_AVG_FRAC) * mAvgSentCount/(1024.0 * float(diff)); + if (diff > PQISTREAM_AVG_PERIOD) + { + float avgReadpSec = PQISTREAM_AVG_FRAC * getRate(true ) + (1.0 - PQISTREAM_AVG_FRAC) * mAvgReadCount/(1024.0 * diff); + float avgSentpSec = PQISTREAM_AVG_FRAC * getRate(false) + (1.0 - PQISTREAM_AVG_FRAC) * mAvgSentCount/(1024.0 * diff); #ifdef DEBUG_PQISTREAMER - std::cerr << "Peer " << PeerId() << ": Current speed estimates: " << avgReadpSec << " / " << avgSentpSec << std::endl; + uint64_t t_now = 1000 * getCurrentTS(); + std::cerr << std::dec << t_now << " DEBUG_PQISTREAMER pqistreamer::updateRates PeerId " << this->PeerId().toStdString() << " Current speed estimates: down " << std::dec << (int)(1024 * avgReadpSec) << " B/s / up " << (int)(1024 * avgSentpSec) << " B/s" << std::endl; #endif - /* pretend our rate is zero if we are - * not bandwidthLimited(). - */ - if (mBio->bandwidthLimited()) - { - setRate(true, avgReadpSec); - setRate(false, avgSentpSec); - } - else - { - std::cerr << "Warning: setting to 0" << std::endl; - setRate(true, 0); - setRate(false, 0); - } - { - RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/ - mAvgLastUpdate = t; - mAvgReadCount = 0; - mAvgSentCount = 0; - } - } + // now store the new rates, zero meaning that we are not bandwidthLimited() + + if (mBio->bandwidthLimited()) + { + setRate(true, avgReadpSec); + setRate(false, avgSentpSec); + } + else + { + setRate(true, 0); + setRate(false, 0); + } + + { + RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/ + mAvgLastUpdate = t; + mAvgReadCount = 0; + mAvgSentCount = 0; + } + } } int pqistreamer::tick_bio() @@ -1114,12 +1115,11 @@ float pqistreamer::outTimeSlice_locked() return 1; } -// very simple..... int pqistreamer::outAllowedBytes_locked() { - double t = getCurrentTS() ; // Grabs today's time in sec, with ms accuracy. Allows a much more accurate allocation of bw + double t = getCurrentTS() ; // in sec, with high accuracy - /* allow a lot if not bandwidthLimited */ + // allow a lot if not bandwidthLimited() if (!mBio->bandwidthLimited()) { mCurrSent = 0; @@ -1127,70 +1127,80 @@ int pqistreamer::outAllowedBytes_locked() return PQISTREAM_ABS_MAX; } - double dt = t - mCurrSentTS; + // dt is the time elapsed since the last round of sending data + double dt = t - mCurrSentTS; - // limiter -> for when currSentTs -> 0. - if (dt > 5) - dt = 5; + // ignore cases where dt > 1s + if (dt > 1) + dt = 1; - double maxout = getMaxRate(false) * 1024.0; + // low pass filter on mAvgDtOut + mAvgDtOut = PQISTREAM_AVG_DT_FRAC * mAvgDtOut + (1 - PQISTREAM_AVG_DT_FRAC) * dt; + + double maxout = getMaxRate(false) * 1024.0; - mCurrSent -= int(dt * maxout); + // this is used to take into account a possible excess of data sent during the previous round + mCurrSent -= int(dt * maxout); if (mCurrSent < 0) mCurrSent = 0; mCurrSentTS = t; + // now calculate the max amount of data allowed to be sent during the next round + // we limit this quota to what should be sent at most during mAvgDtOut, taking into account the excess of data possibly sent during the previous round + double quota = mAvgDtOut * maxout - mCurrSent; + #ifdef DEBUG_PQISTREAMER - { - std::string out; - rs_sprintf(out, "pqistreamer::outAllowedBytes() is %d/%d", maxout - mCurrSent, maxout); - pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out); - } + uint64_t t_now = 1000 * getCurrentTS(); + std::cerr << std::dec << t_now << " DEBUG_PQISTREAMER pqistreamer::outAllowedBytes_locked PeerId " << this->PeerId().toStdString() << " dt " << (int)(1000 * dt) << "ms, mAvgDtOut " << (int)(1000 * mAvgDtOut) << "ms, maxout " << (int)(maxout) << " bytes/s, mCurrSent " << mCurrSent << " bytes, quota " << (int)(quota) << " bytes" << std::endl; #endif - - return maxout - mCurrSent; + return quota; } int pqistreamer::inAllowedBytes_locked() { - double t = getCurrentTS(); // in secs, with a ms accuracy + double t = getCurrentTS(); // in sec, with high accuracy - /* allow a lot if not bandwidthLimited */ + // allow a lot if not bandwidthLimited() if (!mBio->bandwidthLimited()) { + mCurrRead = 0; mCurrReadTS = t; - mCurrRead = 0; return PQISTREAM_ABS_MAX; } - double dt = t - mCurrReadTS; + // dt is the time elapsed since the last round of receiving data + double dt = t - mCurrReadTS; - // limiter -> for when currReadTs -> 0. - if (dt > 5) - dt = 5; + // limit dt to 1s + if (dt > 1) + dt = 1; - double maxin = getMaxRate(true) * 1024.0; + // low pass filter on mAvgDtIn + mAvgDtIn = PQISTREAM_AVG_DT_FRAC * mAvgDtIn + (1 - PQISTREAM_AVG_DT_FRAC) * dt; - mCurrRead -= int(dt * maxin); + double maxin = getMaxRate(true) * 1024.0; + + // this is used to take into account a possible excess of data received during the previous round + mCurrRead -= int(dt * maxin); if (mCurrRead < 0) mCurrRead = 0; mCurrReadTS = t; + // now calculate the max amount of data allowed to be received during the next round + // we limit this quota to what should be received at most during mAvgDtOut, taking into account the excess of data possibly received during the previous round + double quota = mAvgDtIn * maxin - mCurrRead; + #ifdef DEBUG_PQISTREAMER - { - std::string out; - rs_sprintf(out, "pqistreamer::inAllowedBytes() is %d/%d", maxin - mCurrRead, maxin); - pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out); - } + uint64_t t_now = 1000 * getCurrentTS(); + std::cerr << std::dec << t_now << " DEBUG_PQISTREAMER pqistreamer::inAllowedBytes_locked PeerId " << this->PeerId().toStdString() << " dt " << (int)(1000 * dt) << "ms, mAvgDtIn " << (int)(1000 * mAvgDtIn) << "ms, maxin " << (int)(maxin) << " bytes/s, mCurrRead " << mCurrRead << " bytes, quota " << (int)(quota) << " bytes" << std::endl; #endif - - return maxin - mCurrRead; + return quota; } diff --git a/libretroshare/src/pqi/pqistreamer.h b/libretroshare/src/pqi/pqistreamer.h index b4351670c..fe2754854 100644 --- a/libretroshare/src/pqi/pqistreamer.h +++ b/libretroshare/src/pqi/pqistreamer.h @@ -162,10 +162,13 @@ class pqistreamer: public PQInterface double mCurrReadTS; // TS from which these are measured. double mCurrSentTS; - time_t mAvgLastUpdate; // TS from which these are measured. + double mAvgLastUpdate; // TS from which these are measured. uint32_t mAvgReadCount; uint32_t mAvgSentCount; + double mAvgDtOut; // average time diff between 2 rounds of sending data + double mAvgDtIn; // average time diff between 2 rounds of receiving data + time_t mLastIncomingTs; // traffic statistics