Merge pull request #2213 from jolavillette/bandwidthOptim

bandwith management optimization
This commit is contained in:
csoler 2021-01-12 19:35:43 +01:00 committed by GitHub
commit 7b2f6a3439
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 35 additions and 35 deletions

View File

@ -406,8 +406,8 @@ int pqihandler::UpdateRates()
RsDbg() << "UPDATE_RATES pqihandler::UpdateRates mod_index " << mod_index << " out_max_bw " << out_max_bw << " remaining out bw " << out_remaining_bw << std::endl; RsDbg() << "UPDATE_RATES pqihandler::UpdateRates mod_index " << mod_index << " out_max_bw " << out_max_bw << " remaining out bw " << out_remaining_bw << std::endl;
#endif #endif
/* Allocate only half the remaining out bw, if any, to make it smoother */ /* Allocate only 50 pct the remaining out bw, if any, to make the transition more smooth */
out_max_bw = out_max_bw + out_remaining_bw / 2; out_max_bw = out_max_bw + 0.5 * out_remaining_bw;
/* Calculate the optimal in_max value, taking into account avail_in and the in bw requested by modules */ /* Calculate the optimal in_max value, taking into account avail_in and the in bw requested by modules */
@ -437,37 +437,39 @@ int pqihandler::UpdateRates()
RsDbg() << "UPDATE_RATES pqihandler::UpdateRates mod_index " << mod_index << " in_max_bw " << in_max_bw << " remaining in bw " << in_remaining_bw << std::endl; RsDbg() << "UPDATE_RATES pqihandler::UpdateRates mod_index " << mod_index << " in_max_bw " << in_max_bw << " remaining in bw " << in_remaining_bw << std::endl;
#endif #endif
/* Allocate only half the remaining in bw, if any, to make it smoother */ // allocate only 75 pct of the remaining in bw, to make the transition more smooth
in_max_bw = in_max_bw + in_remaining_bw / 2; in_max_bw = in_max_bw + 0.75 * in_remaining_bw;
// store current total in and ou used bw // store current total in and out used bw
locked_StoreCurrentRates(used_bw_in, used_bw_out); locked_StoreCurrentRates(used_bw_in, used_bw_out);
#ifdef UPDATE_RATES_DEBUG #ifdef UPDATE_RATES_DEBUG
RsDbg() << "UPDATE_RATES pqihandler::UpdateRates setting new out_max " << out_max_bw << " in_max " << in_max_bw << std::endl; RsDbg() << "UPDATE_RATES pqihandler::UpdateRates setting new out_max " << out_max_bw << " in_max " << in_max_bw << std::endl;
#endif #endif
// retrieve down (from peer point of view) bandwidth limits set by peers in their own settings // retrieve the bandwidth limits provided by peers via BwCtrl
std::map<RsPeerId, RsConfigDataRates> rateMap; std::map<RsPeerId, RsConfigDataRates> rateMap;
rsConfig->getAllBandwidthRates(rateMap); rsConfig->getAllBandwidthRates(rateMap);
std::map<RsPeerId, RsConfigDataRates>::iterator rateMap_it; std::map<RsPeerId, RsConfigDataRates>::iterator rateMap_it;
#ifdef UPDATE_RATES_DEBUG #ifdef UPDATE_RATES_DEBUG
// Dump RsConfigurationDataRates // dump RsConfigurationDataRates
RsDbg() << "UPDATE_RATES pqihandler::UpdateRates RsConfigDataRates dump" << std::endl; RsDbg() << "UPDATE_RATES pqihandler::UpdateRates RsConfigDataRates dump" << std::endl;
for (rateMap_it = rateMap.begin(); rateMap_it != rateMap.end(); rateMap_it++) for (rateMap_it = rateMap.begin(); rateMap_it != rateMap.end(); rateMap_it++)
RsDbg () << "UPDATE_RATES pqihandler::UpdateRates PeerId " << rateMap_it->first.toStdString() << " mAllowedOut " << rateMap_it->second.mAllowedOut << std::endl; RsDbg () << "UPDATE_RATES pqihandler::UpdateRates PeerId " << rateMap_it->first.toStdString() << " mAllowedOut " << rateMap_it->second.mAllowedOut << std::endl;
#endif #endif
// update max rates taking into account the limits set by peers in their own settings // update max rates
for(it = mods.begin(); it != mods.end(); ++it) for(it = mods.begin(); it != mods.end(); ++it)
{ {
SearchModule *mod = (it -> second); SearchModule *mod = (it -> second);
// for our down bandwidth we set the max to the calculated value without taking into account the max set by peers: they will control their up bw on their side // for our down bandwidth we use the calculated value without taking into account the max up provided by peers via BwCtrl
// this is harmless as they will control their up bw on their side
mod -> pqi -> setMaxRate(true, in_max_bw); mod -> pqi -> setMaxRate(true, in_max_bw);
// for our up bandwidth we limit to the maximum down bw provided by peers via BwCtrl because we don't want to clog our outqueues, the SSL buffers, and our friends inbound queues // for our up bandwidth we take into account the max down provided by peers via BwCtrl
// because we don't want to clog our outqueues, the TCP buffers, and the peers inbound queues
if ((rateMap_it = rateMap.find(mod->pqi->PeerId())) != rateMap.end()) if ((rateMap_it = rateMap.find(mod->pqi->PeerId())) != rateMap.end())
{ {
if (rateMap_it->second.mAllowedOut > 0) if (rateMap_it->second.mAllowedOut > 0)

View File

@ -550,14 +550,12 @@ int pqistreamer::handleoutgoing_locked()
if ((!(mBio->cansend(0))) || (maxbytes < sentbytes)) if ((!(mBio->cansend(0))) || (maxbytes < sentbytes))
{ {
#ifdef DEBUG_PQISTREAMER
#ifdef DEBUG_PACKET_SLICING if (sentbytes > maxbytes)
if (maxbytes < sentbytes) RsDbg() << "PQISTREAMER pqistreamer::handleoutgoing_locked() stopped sending max reached, sentbytes " << std::dec << sentbytes << " maxbytes " << maxbytes;
std::cerr << "pqistreamer::handleoutgoing_locked() Stopped sending: bio not ready. maxbytes=" << maxbytes << ", sentbytes=" << sentbytes << std::endl;
else else
std::cerr << "pqistreamer::handleoutgoing_locked() Stopped sending: sentbytes=" << sentbytes << ", max=" << maxbytes << std::endl; RsDbg() << "PQISTREAMER pqistreamer::handleoutgoing_locked() stopped sending bio not ready, sentbytes " << std::dec << sentbytes << " maxbytes " << maxbytes;
#endif #endif
return 0; return 0;
} }
// send a out_pkt., else send out_data. unless there is a pending packet. The strategy is to // send a out_pkt., else send out_data. unless there is a pending packet. The strategy is to
@ -1019,12 +1017,11 @@ continue_packet:
if(maxin > readbytes && mBio->moretoread(0)) if(maxin > readbytes && mBio->moretoread(0))
goto start_packet_read ; goto start_packet_read ;
#ifdef DEBUG_TRANSFERS #ifdef DEBUG_PQISTREAMER
if (readbytes >= maxin) if (readbytes > maxin)
{ RsDbg() << "PQISTREAMER pqistreamer::handleincoming() stopped reading max reached, readbytes " << std::dec << readbytes << " maxin " << maxin;
std::cerr << "pqistreamer::handleincoming() Stopped reading as readbytes >= maxin. Read " << readbytes << " bytes "; else
std::cerr << std::endl; RsDbg() << "PQISTREAMER pqistreamer::handleincoming() stopped reading no more to read, readbytes " << std::dec << readbytes << " maxin " << maxin;
}
#endif #endif
return 0; return 0;
@ -1157,18 +1154,19 @@ int pqistreamer::outAllowedBytes_locked()
// this is used to take into account a possible excess of data sent during the previous round // this is used to take into account a possible excess of data sent during the previous round
mCurrSent -= int(dt * maxout); mCurrSent -= int(dt * maxout);
// we dont allow negative value, any quota not used during the previous round is therefore lost
if (mCurrSent < 0) if (mCurrSent < 0)
mCurrSent = 0; mCurrSent = 0;
mCurrSentTS = t; mCurrSentTS = t;
// now calculate the max amount of data allowed to be sent during the next round // now calculate the amount of data allowed to be sent during the next round
// we limit this quota to what should be sent at most during mAvgDtOut, taking into account the excess of data possibly sent during the previous round // we take into account the possible excess (but not deficit) of the previous round
// (this is handled differently when reading data, see below)
double quota = mAvgDtOut * maxout - mCurrSent; double quota = mAvgDtOut * maxout - mCurrSent;
#ifdef DEBUG_PQISTREAMER #ifdef DEBUG_PQISTREAMER
uint64_t t_now = 1000 * getCurrentTS(); RsDbg() << "PQISTREAMER pqistreamer::outAllowedBytes_locked() dt " << std::dec << (int)(1000 * dt) << "ms, mAvgDtOut " << (int)(1000 * mAvgDtOut) << "ms, maxout " << (int)(maxout) << " bytes/s, mCurrSent " << mCurrSent << " bytes, quota " << (int)(quota) << " bytes";
std::cerr << std::dec << t_now << " DEBUG_PQISTREAMER pqistreamer::outAllowedBytes_locked PeerId " << this->PeerId().toStdString() << " dt " << (int)(1000 * dt) << "ms, mAvgDtOut " << (int)(1000 * mAvgDtOut) << "ms, maxout " << (int)(maxout) << " bytes/s, mCurrSent " << mCurrSent << " bytes, quota " << (int)(quota) << " bytes" << std::endl;
#endif #endif
return quota; return quota;
@ -1198,27 +1196,27 @@ int pqistreamer::inAllowedBytes()
double maxin = getMaxRate(true) * 1024.0; double maxin = getMaxRate(true) * 1024.0;
// this is used to take into account a possible excess of data received during the previous round // this is used to take into account a possible excess/deficit of data received during the previous round
mCurrRead -= int(dt * maxin); mCurrRead -= int(dt * maxin);
if (mCurrRead < 0) // we allow negative value up to the average amount of data received during one round
mCurrRead = 0; // in that case we will use this credit during the next around
if (mCurrRead < - mAvgDtIn * maxin)
mCurrRead = - mAvgDtIn * maxin;
mCurrReadTS = t; mCurrReadTS = t;
// now calculate the max amount of data allowed to be received during the next round // we now calculate the max amount of data allowed to be received during the next round
// we limit this quota to what should be received at most during mAvgDtOut, taking into account the excess of data possibly received during the previous round // we take into account the excess/deficit of the previous round
double quota = mAvgDtIn * maxin - mCurrRead; double quota = mAvgDtIn * maxin - mCurrRead;
#ifdef DEBUG_PQISTREAMER #ifdef DEBUG_PQISTREAMER
uint64_t t_now = 1000 * getCurrentTS(); RsDbg() << "PQISTREAMER pqistreamer::inAllowedBytes() dt " << std::dec << (int)(1000 * dt) << "ms, mAvgDtIn " << (int)(1000 * mAvgDtIn) << "ms, maxin " << (int)(maxin) << " bytes/s, mCurrRead " << mCurrRead << " bytes, quota " << (int)(quota) << " bytes";
std::cerr << std::dec << t_now << " DEBUG_PQISTREAMER pqistreamer::inAllowedBytes PeerId " << this->PeerId().toStdString() << " dt " << (int)(1000 * dt) << "ms, mAvgDtIn " << (int)(1000 * mAvgDtIn) << "ms, maxin " << (int)(maxin) << " bytes/s, mCurrRead " << mCurrRead << " bytes, quota " << (int)(quota) << " bytes" << std::endl;
#endif #endif
return quota; return quota;
} }
void pqistreamer::outSentBytes_locked(uint32_t outb) void pqistreamer::outSentBytes_locked(uint32_t outb)
{ {
#ifdef DEBUG_PQISTREAMER #ifdef DEBUG_PQISTREAMER