mirror of
https://github.com/RetroShare/RetroShare.git
synced 2025-01-15 09:27:09 -05:00
Attempt to reduce BW bursts
This commit is contained in:
parent
8568199497
commit
1b8de7b30e
@ -48,8 +48,10 @@ static struct RsLog::logInfo pqistreamerzoneInfo = {RsLog::Default, "pqistreamer
|
|||||||
#define pqistreamerzone &pqistreamerzoneInfo
|
#define pqistreamerzone &pqistreamerzoneInfo
|
||||||
|
|
||||||
static const int PQISTREAM_ABS_MAX = 100000000; /* 100 MB/sec (actually per loop) */
|
static const int PQISTREAM_ABS_MAX = 100000000; /* 100 MB/sec (actually per loop) */
|
||||||
static const int PQISTREAM_AVG_PERIOD = 5; // update speed estimate every 5 seconds
|
static const int PQISTREAM_AVG_PERIOD = 1; // update speed estimate every second
|
||||||
static const float PQISTREAM_AVG_FRAC = 0.8; // for bandpass filter over speed estimate.
|
static const float PQISTREAM_AVG_FRAC = 0.8; // for bandpass filter over speed estimate.
|
||||||
|
static const float PQISTREAM_AVG_DT_FRAC = 0.99; // for low pass filter over elapsed time
|
||||||
|
|
||||||
static const int PQISTREAM_OPTIMAL_PACKET_SIZE = 512; // It is believed that this value should be lower than TCP slices and large enough as compare to encryption padding.
|
static const int PQISTREAM_OPTIMAL_PACKET_SIZE = 512; // It is believed that this value should be lower than TCP slices and large enough as compare to encryption padding.
|
||||||
// most importantly, it should be constant, so as to allow correct QoS.
|
// most importantly, it should be constant, so as to allow correct QoS.
|
||||||
static const int PQISTREAM_SLICE_FLAG_STARTS = 0x01; //
|
static const int PQISTREAM_SLICE_FLAG_STARTS = 0x01; //
|
||||||
@ -100,7 +102,8 @@ pqistreamer::pqistreamer(RsSerialiser *rss, const RsPeerId& id, BinInterface *bi
|
|||||||
mPkt_wpending(NULL), mPkt_wpending_size(0),
|
mPkt_wpending(NULL), mPkt_wpending_size(0),
|
||||||
mTotalRead(0), mTotalSent(0),
|
mTotalRead(0), mTotalSent(0),
|
||||||
mCurrRead(0), mCurrSent(0),
|
mCurrRead(0), mCurrSent(0),
|
||||||
mAvgReadCount(0), mAvgSentCount(0)
|
mAvgReadCount(0), mAvgSentCount(0),
|
||||||
|
mAvgDtOut(0), mAvgDtIn(0)
|
||||||
{
|
{
|
||||||
|
|
||||||
// 100 B/s (minimal)
|
// 100 B/s (minimal)
|
||||||
@ -114,8 +117,7 @@ pqistreamer::pqistreamer(RsSerialiser *rss, const RsPeerId& id, BinInterface *bi
|
|||||||
mAcceptsPacketSlicing = false ; // by default. Will be turned into true when everyone's ready.
|
mAcceptsPacketSlicing = false ; // by default. Will be turned into true when everyone's ready.
|
||||||
mLastSentPacketSlicingProbe = 0 ;
|
mLastSentPacketSlicingProbe = 0 ;
|
||||||
|
|
||||||
mAvgLastUpdate = time(NULL);
|
mAvgLastUpdate = mCurrSentTS = mCurrReadTS = getCurrentTS();
|
||||||
mCurrSentTS = mCurrReadTS = getCurrentTS();
|
|
||||||
|
|
||||||
mIncomingSize = 0 ;
|
mIncomingSize = 0 ;
|
||||||
|
|
||||||
@ -231,47 +233,46 @@ void pqistreamer::setRate(bool b,float f)
|
|||||||
|
|
||||||
void pqistreamer::updateRates()
|
void pqistreamer::updateRates()
|
||||||
{
|
{
|
||||||
// now update rates both ways.
|
// update rates both ways.
|
||||||
|
|
||||||
time_t t = time(NULL); // get current timestep.
|
double t = getCurrentTS(); // get current timestamp.
|
||||||
int64_t diff ;
|
double diff ;
|
||||||
|
|
||||||
{
|
{
|
||||||
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
|
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
|
||||||
|
diff = t - mAvgLastUpdate ;
|
||||||
|
}
|
||||||
|
|
||||||
diff = int64_t(t) - int64_t(mAvgLastUpdate) ;
|
if (diff > PQISTREAM_AVG_PERIOD)
|
||||||
}
|
{
|
||||||
|
float avgReadpSec = PQISTREAM_AVG_FRAC * getRate(true ) + (1.0 - PQISTREAM_AVG_FRAC) * mAvgReadCount/(1024.0 * diff);
|
||||||
if (diff > PQISTREAM_AVG_PERIOD)
|
float avgSentpSec = PQISTREAM_AVG_FRAC * getRate(false) + (1.0 - PQISTREAM_AVG_FRAC) * mAvgSentCount/(1024.0 * diff);
|
||||||
{
|
|
||||||
float avgReadpSec = getRate(true ) * PQISTREAM_AVG_FRAC + (1.0 - PQISTREAM_AVG_FRAC) * mAvgReadCount/(1024.0 * float(diff));
|
|
||||||
float avgSentpSec = getRate(false) * PQISTREAM_AVG_FRAC + (1.0 - PQISTREAM_AVG_FRAC) * mAvgSentCount/(1024.0 * float(diff));
|
|
||||||
|
|
||||||
#ifdef DEBUG_PQISTREAMER
|
#ifdef DEBUG_PQISTREAMER
|
||||||
std::cerr << "Peer " << PeerId() << ": Current speed estimates: " << avgReadpSec << " / " << avgSentpSec << std::endl;
|
uint64_t t_now = 1000 * getCurrentTS();
|
||||||
|
std::cerr << std::dec << t_now << " DEBUG_PQISTREAMER pqistreamer::updateRates PeerId " << this->PeerId().toStdString() << " Current speed estimates: down " << std::dec << (int)(1024 * avgReadpSec) << " B/s / up " << (int)(1024 * avgSentpSec) << " B/s" << std::endl;
|
||||||
#endif
|
#endif
|
||||||
/* pretend our rate is zero if we are
|
|
||||||
* not bandwidthLimited().
|
|
||||||
*/
|
|
||||||
if (mBio->bandwidthLimited())
|
|
||||||
{
|
|
||||||
setRate(true, avgReadpSec);
|
|
||||||
setRate(false, avgSentpSec);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
std::cerr << "Warning: setting to 0" << std::endl;
|
|
||||||
setRate(true, 0);
|
|
||||||
setRate(false, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
// now store the new rates, zero meaning that we are not bandwidthLimited()
|
||||||
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
|
|
||||||
mAvgLastUpdate = t;
|
if (mBio->bandwidthLimited())
|
||||||
mAvgReadCount = 0;
|
{
|
||||||
mAvgSentCount = 0;
|
setRate(true, avgReadpSec);
|
||||||
}
|
setRate(false, avgSentpSec);
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
setRate(true, 0);
|
||||||
|
setRate(false, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
|
||||||
|
mAvgLastUpdate = t;
|
||||||
|
mAvgReadCount = 0;
|
||||||
|
mAvgSentCount = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int pqistreamer::tick_bio()
|
int pqistreamer::tick_bio()
|
||||||
@ -1114,12 +1115,11 @@ float pqistreamer::outTimeSlice_locked()
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// very simple.....
|
|
||||||
int pqistreamer::outAllowedBytes_locked()
|
int pqistreamer::outAllowedBytes_locked()
|
||||||
{
|
{
|
||||||
double t = getCurrentTS() ; // Grabs today's time in sec, with ms accuracy. Allows a much more accurate allocation of bw
|
double t = getCurrentTS() ; // in sec, with high accuracy
|
||||||
|
|
||||||
/* allow a lot if not bandwidthLimited */
|
// allow a lot if not bandwidthLimited()
|
||||||
if (!mBio->bandwidthLimited())
|
if (!mBio->bandwidthLimited())
|
||||||
{
|
{
|
||||||
mCurrSent = 0;
|
mCurrSent = 0;
|
||||||
@ -1127,70 +1127,80 @@ int pqistreamer::outAllowedBytes_locked()
|
|||||||
return PQISTREAM_ABS_MAX;
|
return PQISTREAM_ABS_MAX;
|
||||||
}
|
}
|
||||||
|
|
||||||
double dt = t - mCurrSentTS;
|
// dt is the time elapsed since the last round of sending data
|
||||||
|
double dt = t - mCurrSentTS;
|
||||||
|
|
||||||
// limiter -> for when currSentTs -> 0.
|
// ignore cases where dt > 1s
|
||||||
if (dt > 5)
|
if (dt > 1)
|
||||||
dt = 5;
|
dt = 1;
|
||||||
|
|
||||||
double maxout = getMaxRate(false) * 1024.0;
|
// low pass filter on mAvgDtOut
|
||||||
|
mAvgDtOut = PQISTREAM_AVG_DT_FRAC * mAvgDtOut + (1 - PQISTREAM_AVG_DT_FRAC) * dt;
|
||||||
|
|
||||||
|
double maxout = getMaxRate(false) * 1024.0;
|
||||||
|
|
||||||
mCurrSent -= int(dt * maxout);
|
// this is used to take into account a possible excess of data sent during the previous round
|
||||||
|
mCurrSent -= int(dt * maxout);
|
||||||
|
|
||||||
if (mCurrSent < 0)
|
if (mCurrSent < 0)
|
||||||
mCurrSent = 0;
|
mCurrSent = 0;
|
||||||
|
|
||||||
mCurrSentTS = t;
|
mCurrSentTS = t;
|
||||||
|
|
||||||
|
// now calculate the max amount of data allowed to be sent during the next round
|
||||||
|
// we limit this quota to what should be sent at most during mAvgDtOut, taking into account the excess of data possibly sent during the previous round
|
||||||
|
double quota = mAvgDtOut * maxout - mCurrSent;
|
||||||
|
|
||||||
#ifdef DEBUG_PQISTREAMER
|
#ifdef DEBUG_PQISTREAMER
|
||||||
{
|
uint64_t t_now = 1000 * getCurrentTS();
|
||||||
std::string out;
|
std::cerr << std::dec << t_now << " DEBUG_PQISTREAMER pqistreamer::outAllowedBytes_locked PeerId " << this->PeerId().toStdString() << " dt " << (int)(1000 * dt) << "ms, mAvgDtOut " << (int)(1000 * mAvgDtOut) << "ms, maxout " << (int)(maxout) << " bytes/s, mCurrSent " << mCurrSent << " bytes, quota " << (int)(quota) << " bytes" << std::endl;
|
||||||
rs_sprintf(out, "pqistreamer::outAllowedBytes() is %d/%d", maxout - mCurrSent, maxout);
|
|
||||||
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out);
|
|
||||||
}
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
return quota;
|
||||||
return maxout - mCurrSent;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int pqistreamer::inAllowedBytes_locked()
|
int pqistreamer::inAllowedBytes_locked()
|
||||||
{
|
{
|
||||||
double t = getCurrentTS(); // in secs, with a ms accuracy
|
double t = getCurrentTS(); // in sec, with high accuracy
|
||||||
|
|
||||||
/* allow a lot if not bandwidthLimited */
|
// allow a lot if not bandwidthLimited()
|
||||||
if (!mBio->bandwidthLimited())
|
if (!mBio->bandwidthLimited())
|
||||||
{
|
{
|
||||||
|
mCurrRead = 0;
|
||||||
mCurrReadTS = t;
|
mCurrReadTS = t;
|
||||||
mCurrRead = 0;
|
|
||||||
return PQISTREAM_ABS_MAX;
|
return PQISTREAM_ABS_MAX;
|
||||||
}
|
}
|
||||||
|
|
||||||
double dt = t - mCurrReadTS;
|
// dt is the time elapsed since the last round of receiving data
|
||||||
|
double dt = t - mCurrReadTS;
|
||||||
|
|
||||||
// limiter -> for when currReadTs -> 0.
|
// limit dt to 1s
|
||||||
if (dt > 5)
|
if (dt > 1)
|
||||||
dt = 5;
|
dt = 1;
|
||||||
|
|
||||||
double maxin = getMaxRate(true) * 1024.0;
|
// low pass filter on mAvgDtIn
|
||||||
|
mAvgDtIn = PQISTREAM_AVG_DT_FRAC * mAvgDtIn + (1 - PQISTREAM_AVG_DT_FRAC) * dt;
|
||||||
|
|
||||||
mCurrRead -= int(dt * maxin);
|
double maxin = getMaxRate(true) * 1024.0;
|
||||||
|
|
||||||
|
// this is used to take into account a possible excess of data received during the previous round
|
||||||
|
mCurrRead -= int(dt * maxin);
|
||||||
|
|
||||||
if (mCurrRead < 0)
|
if (mCurrRead < 0)
|
||||||
mCurrRead = 0;
|
mCurrRead = 0;
|
||||||
|
|
||||||
mCurrReadTS = t;
|
mCurrReadTS = t;
|
||||||
|
|
||||||
|
// now calculate the max amount of data allowed to be received during the next round
|
||||||
|
// we limit this quota to what should be received at most during mAvgDtOut, taking into account the excess of data possibly received during the previous round
|
||||||
|
double quota = mAvgDtIn * maxin - mCurrRead;
|
||||||
|
|
||||||
#ifdef DEBUG_PQISTREAMER
|
#ifdef DEBUG_PQISTREAMER
|
||||||
{
|
uint64_t t_now = 1000 * getCurrentTS();
|
||||||
std::string out;
|
std::cerr << std::dec << t_now << " DEBUG_PQISTREAMER pqistreamer::inAllowedBytes_locked PeerId " << this->PeerId().toStdString() << " dt " << (int)(1000 * dt) << "ms, mAvgDtIn " << (int)(1000 * mAvgDtIn) << "ms, maxin " << (int)(maxin) << " bytes/s, mCurrRead " << mCurrRead << " bytes, quota " << (int)(quota) << " bytes" << std::endl;
|
||||||
rs_sprintf(out, "pqistreamer::inAllowedBytes() is %d/%d", maxin - mCurrRead, maxin);
|
|
||||||
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out);
|
|
||||||
}
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
return quota;
|
||||||
return maxin - mCurrRead;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -162,10 +162,13 @@ class pqistreamer: public PQInterface
|
|||||||
double mCurrReadTS; // TS from which these are measured.
|
double mCurrReadTS; // TS from which these are measured.
|
||||||
double mCurrSentTS;
|
double mCurrSentTS;
|
||||||
|
|
||||||
time_t mAvgLastUpdate; // TS from which these are measured.
|
double mAvgLastUpdate; // TS from which these are measured.
|
||||||
uint32_t mAvgReadCount;
|
uint32_t mAvgReadCount;
|
||||||
uint32_t mAvgSentCount;
|
uint32_t mAvgSentCount;
|
||||||
|
|
||||||
|
double mAvgDtOut; // average time diff between 2 rounds of sending data
|
||||||
|
double mAvgDtIn; // average time diff between 2 rounds of receiving data
|
||||||
|
|
||||||
time_t mLastIncomingTs;
|
time_t mLastIncomingTs;
|
||||||
|
|
||||||
// traffic statistics
|
// traffic statistics
|
||||||
|
Loading…
Reference in New Issue
Block a user