Revert "Revert "bandwidth control improvement""

This commit is contained in:
csoler 2020-05-25 20:28:50 +02:00 committed by GitHub
parent d87abb45e3
commit b294a34d11
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -42,39 +42,19 @@ using std::dec;
#include <sys/timeb.h>
#endif
//#define PQI_HDL_DEBUG_UR 1
#ifdef PQI_HDL_DEBUG_UR
static double getCurrentTS()
{
#ifndef WINDOWS_SYS
struct timeval cts_tmp;
gettimeofday(&cts_tmp, NULL);
double cts = (cts_tmp.tv_sec) + ((double) cts_tmp.tv_usec) / 1000000.0;
#else
struct _timeb timebuf;
_ftime( &timebuf);
double cts = (timebuf.time) + ((double) timebuf.millitm) / 1000.0;
#endif
return cts;
}
#endif
struct RsLog::logInfo pqihandlerzoneInfo = {RsLog::Default, "pqihandler"};
#define pqihandlerzone &pqihandlerzoneInfo
//static const int PQI_HANDLER_NB_PRIORITY_LEVELS = 10 ;
//static const float PQI_HANDLER_NB_PRIORITY_RATIO = 2 ;
/****
#define DEBUG_TICK 1
#define RSITEM_DEBUG 1
****/
//#define UPDATE_RATES_DEBUG 1
// #define DEBUG_TICK 1
// #define RSITEM_DEBUG 1
pqihandler::pqihandler() : coreMtx("pqihandler")
{
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
RS_STACK_MUTEX(coreMtx); /**************** LOCKED MUTEX ****************/
// setup minimal total+individual rates.
rateIndiv_out = 0.01;
@ -97,7 +77,7 @@ int pqihandler::tick()
int moreToTick = 0;
{
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
RS_STACK_MUTEX(coreMtx); /**************** LOCKED MUTEX ****************/
// tick all interfaces...
std::map<RsPeerId, SearchModule *>::iterator it;
@ -127,9 +107,13 @@ int pqihandler::tick()
if(now > mLastRateCapUpdate + 5)
{
std::map<RsPeerId, RsConfigDataRates> rateMap;
std::map<RsPeerId, RsConfigDataRates>::iterator it;
// every 5 secs, update the max rates for all modules
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
RS_STACK_MUTEX(coreMtx); /**************** LOCKED MUTEX ****************/
for(std::map<RsPeerId, SearchModule *>::iterator it = mods.begin(); it != mods.end(); ++it)
{
// This is rather inelegant, but pqihandler has searchModules that are dynamically allocated, so the max rates
@ -149,7 +133,7 @@ int pqihandler::tick()
bool pqihandler::queueOutRsItem(RsItem *item)
{
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
RS_STACK_MUTEX(coreMtx); /**************** LOCKED MUTEX ****************/
uint32_t size ;
locked_HandleRsItem(item, size);
@ -166,7 +150,7 @@ bool pqihandler::queueOutRsItem(RsItem *item)
int pqihandler::status()
{
std::map<RsPeerId, SearchModule *>::iterator it;
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
RS_STACK_MUTEX(coreMtx); /**************** LOCKED MUTEX ****************/
{ // for output
std::string out = "pqihandler::status() Active Modules:\n";
@ -192,7 +176,7 @@ int pqihandler::status()
bool pqihandler::AddSearchModule(SearchModule *mod)
{
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
RS_STACK_MUTEX(coreMtx); /**************** LOCKED MUTEX ****************/
// if peerid used -> error.
//std::map<RsPeerId, SearchModule *>::iterator it;
if (mod->peerid != mod->pqi->PeerId())
@ -223,7 +207,7 @@ bool pqihandler::AddSearchModule(SearchModule *mod)
bool pqihandler::RemoveSearchModule(SearchModule *mod)
{
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
RS_STACK_MUTEX(coreMtx); /**************** LOCKED MUTEX ****************/
std::map<RsPeerId, SearchModule *>::iterator it;
for(it = mods.begin(); it != mods.end(); ++it)
{
@ -313,7 +297,7 @@ int pqihandler::ExtractRates(std::map<RsPeerId, RsBwRates> &ratemap, RsBwRat
total.mQueueOut = 0;
/* Lock once rates have been retrieved */
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
RS_STACK_MUTEX(coreMtx); /**************** LOCKED MUTEX ****************/
std::map<RsPeerId, SearchModule *>::iterator it;
for(it = mods.begin(); it != mods.end(); ++it)
@ -340,10 +324,6 @@ int pqihandler::ExtractRates(std::map<RsPeerId, RsBwRates> &ratemap, RsBwRat
// internal fn to send updates
int pqihandler::UpdateRates()
{
#ifdef PQI_HDL_DEBUG_UR
uint64_t t_now;
#endif
std::map<RsPeerId, SearchModule *>::iterator it;
float avail_in = getMaxRate(true);
@ -353,18 +333,15 @@ int pqihandler::UpdateRates()
float used_bw_out = 0;
/* Lock once rates have been retrieved */
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
RS_STACK_MUTEX(coreMtx); /**************** LOCKED MUTEX ****************/
int num_sm = mods.size();
float used_bw_in_table[num_sm]; /* table of in bandwidth currently used by each module */
float used_bw_out_table[num_sm]; /* table of out bandwidth currently used by each module */
int effectiveUploadsSm = 0;
int effectiveDownloadsSm = 0;
// loop through modules to get the used bandwith and the number of modules that are affectively transfering
#ifdef PQI_HDL_DEBUG_UR
std::cerr << "Looping through modules" << std::endl;
// loop through modules to get the used bandwidth
#ifdef UPDATE_RATES_DEBUG
RsDbg() << "UPDATE_RATES pqihandler::UpdateRates Looping through modules" << std::endl;
#endif
int index = 0;
@ -372,49 +349,33 @@ int pqihandler::UpdateRates()
for(it = mods.begin(); it != mods.end(); ++it)
{
SearchModule *mod = (it -> second);
float crate_in = mod -> pqi -> getRate(true);
traffInSum += mod -> pqi -> getTraffic(true);
traffOutSum += mod -> pqi -> getTraffic(false);
#ifdef PQI_HDL_DEBUG_UR
if(crate_in > 0.0)
std::cerr << " got in rate for peer " << it->first << " : " << crate_in << std::endl;
#endif
if ((crate_in > 0.01 * avail_in) || (crate_in > 0.1))
{
++effectiveDownloadsSm;
}
float crate_in = mod -> pqi -> getRate(true);
float crate_out = mod -> pqi -> getRate(false);
if ((crate_out > 0.01 * avail_out) || (crate_out > 0.1))
{
++effectiveUploadsSm;
}
used_bw_in += crate_in;
used_bw_out += crate_out;
/* fill the table of bandwidth */
/* fill the table of used bandwidths */
used_bw_in_table[index] = crate_in;
used_bw_out_table[index] = crate_out;
++index;
}
#ifdef PQI_HDL_DEBUG_UR
t_now = 1000 * getCurrentTS();
std::cerr << dec << t_now << " pqihandler::UpdateRates(): Sorting used_bw_out_table: " << num_sm << " entries" << std::endl;
#ifdef UPDATE_RATES_DEBUG
RsDbg() << "UPDATE_RATES pqihandler::UpdateRates Sorting used_bw_out_table: " << num_sm << " entries" << std::endl;
#endif
/* Sort the used bw in/out table in ascending order */
std::sort(used_bw_in_table, used_bw_in_table + num_sm);
std::sort(used_bw_out_table, used_bw_out_table + num_sm);
#ifdef PQI_HDL_DEBUG_UR
t_now = 1000 * getCurrentTS();
std::cerr << dec << t_now << " pqihandler::UpdateRates(): Done." << std::endl;
std::cerr << dec << t_now << " pqihandler::UpdateRates(): used_bw_out " << used_bw_out << std::endl;
#ifdef UPDATE_RATES_DEBUG
RsDbg() << "UPDATE_RATES pqihandler::UpdateRates used_bw_out " << used_bw_out << std::endl;
#endif
/* Calculate the optimal out_max value, taking into account avail_out and the out bw requested by modules */
@ -441,9 +402,8 @@ int pqihandler::UpdateRates()
}
}
#ifdef PQI_HDL_DEBUG_UR
t_now = 1000 * getCurrentTS();
std::cerr << dec << t_now << " pqihandler::UpdateRates(): mod_index " << mod_index << " out_max_bw " << out_max_bw << " remaining out bw " << out_remaining_bw << std::endl;
#ifdef UPDATE_RATES_DEBUG
RsDbg() << "UPDATE_RATES pqihandler::UpdateRates mod_index " << mod_index << " out_max_bw " << out_max_bw << " remaining out bw " << out_remaining_bw << std::endl;
#endif
/* Allocate only half the remaining out bw, if any, to make it smoother */
@ -473,67 +433,70 @@ int pqihandler::UpdateRates()
}
}
#ifdef PQI_HDL_DEBUG_UR
t_now = 1000 * getCurrentTS();
std::cerr << dec << t_now << " pqihandler::UpdateRates(): mod_index " << mod_index << " in_max_bw " << in_max_bw << " remaining in bw " << in_remaining_bw << std::endl;
#ifdef UPDATE_RATES_DEBUG
RsDbg() << "UPDATE_RATES pqihandler::UpdateRates mod_index " << mod_index << " in_max_bw " << in_max_bw << " remaining in bw " << in_remaining_bw << std::endl;
#endif
/* Allocate only half the remaining in bw, if any, to make it smoother */
in_max_bw = in_max_bw + in_remaining_bw / 2;
#ifdef DEBUG_QOS
// std::cerr << "Totals (In) Used B/W " << used_bw_in;
// std::cerr << " Available B/W " << avail_in;
// std::cerr << " Effective transfers " << effectiveDownloadsSm << std::endl;
// std::cerr << "Totals (Out) Used B/W " << used_bw_out;
// std::cerr << " Available B/W " << avail_out;
// std::cerr << " Effective transfers " << effectiveUploadsSm << std::endl;
#endif
// store current total in and ou used bw
locked_StoreCurrentRates(used_bw_in, used_bw_out);
//computing average rates for effective transfers
float max_in_effective = avail_in / num_sm;
if (effectiveDownloadsSm != 0) {
max_in_effective = avail_in / effectiveDownloadsSm;
}
float max_out_effective = avail_out / num_sm;
if (effectiveUploadsSm != 0) {
max_out_effective = avail_out / effectiveUploadsSm;
}
//modify the in and out limit
#ifdef PQI_HDL_DEBUG_UR
t_now = 1000 * getCurrentTS();
std::cerr << dec << t_now << " pqihandler::UpdateRates(): setting new out_max " << out_max_bw << " in_max " << in_max_bw << std::endl;
#ifdef UPDATE_RATES_DEBUG
RsDbg() << "UPDATE_RATES pqihandler::UpdateRates setting new out_max " << out_max_bw << " in_max " << in_max_bw << std::endl;
#endif
// retrieve down (from peer point of view) bandwidth limits set by peers in their own settings
std::map<RsPeerId, RsConfigDataRates> rateMap;
rsConfig->getAllBandwidthRates(rateMap);
std::map<RsPeerId, RsConfigDataRates>::iterator rateMap_it;
#ifdef UPDATE_RATES_DEBUG
// Dump RsConfigurationDataRates
RsDbg() << "UPDATE_RATES pqihandler::UpdateRates RsConfigDataRates dump" << std::endl;
for (rateMap_it = rateMap.begin(); rateMap_it != rateMap.end(); rateMap_it++)
RsDbg () << "UPDATE_RATES pqihandler::UpdateRates PeerId " << rateMap_it->first.toStdString() << " mAllowedOut " << rateMap_it->second.mAllowedOut << std::endl;
#endif
// update max rates taking into account the limits set by peers in their own settings
for(it = mods.begin(); it != mods.end(); ++it)
{
SearchModule *mod = (it -> second);
mod -> pqi -> setMaxRate(true, in_max_bw);
mod -> pqi -> setMaxRate(false, out_max_bw);
// for our down bandwidth we set the max to the calculated value without taking into account the max set by peers: they will control their up bw on their side
mod -> pqi -> setMaxRate(true, in_max_bw);
// for our up bandwidth we limit to the maximum down bw provided by peers via BwCtrl because we don't want to clog our outqueues, the SSL buffers, and our friends inbound queues
if ((rateMap_it = rateMap.find(mod->pqi->PeerId())) != rateMap.end())
{
if (rateMap_it->second.mAllowedOut > 0)
{
if (out_max_bw > rateMap_it->second.mAllowedOut)
mod -> pqi -> setMaxRate(false, rateMap_it->second.mAllowedOut);
else
mod -> pqi -> setMaxRate(false, out_max_bw);
}
else
mod -> pqi -> setMaxRate(false, out_max_bw);
}
}
//cap the rates
#ifdef UPDATE_RATES_DEBUG
// dump maxRates
for(it = mods.begin(); it != mods.end(); ++it)
{
SearchModule *mod = (it -> second);
if (mod -> pqi -> getMaxRate(false) < max_out_effective) mod -> pqi -> setMaxRate(false, max_out_effective);
if (mod -> pqi -> getMaxRate(false) > avail_out) mod -> pqi -> setMaxRate(false, avail_out);
if (mod -> pqi -> getMaxRate(true) < max_in_effective) mod -> pqi -> setMaxRate(true, max_in_effective);
if (mod -> pqi -> getMaxRate(true) > avail_in) mod -> pqi -> setMaxRate(true, avail_in);
RsDbg() << "UPDATE_RATES pqihandler::UpdateRates PeerID " << (mod ->pqi -> PeerId()).toStdString() << " new bandwidth limits up " << mod -> pqi -> getMaxRate(false) << " down " << mod -> pqi -> getMaxRate(true) << std::endl;
}
#endif
return 1;
}
void pqihandler::getCurrentRates(float &in, float &out)
{
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
RS_STACK_MUTEX(coreMtx); /**************** LOCKED MUTEX ****************/
in = rateTotal_in;
out = rateTotal_out;