mirror of
https://github.com/RetroShare/RetroShare.git
synced 2025-01-28 00:07:09 -05:00
Fix pqihandler::UpdateRates() to get more accurate.
This commit is contained in:
parent
00517fe68d
commit
4521e8ee95
@ -29,6 +29,30 @@
|
|||||||
#include "util/rsstring.h"
|
#include "util/rsstring.h"
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
#include <time.h>
|
#include <time.h>
|
||||||
|
|
||||||
|
using std::dec;
|
||||||
|
|
||||||
|
#include <time.h>
|
||||||
|
#include <sys/time.h>
|
||||||
|
#ifdef WINDOWS_SYS
|
||||||
|
#include <sys/timeb.h>
|
||||||
|
#endif
|
||||||
|
|
||||||
|
static double getCurrentTS()
|
||||||
|
{
|
||||||
|
|
||||||
|
#ifndef WINDOWS_SYS
|
||||||
|
struct timeval cts_tmp;
|
||||||
|
gettimeofday(&cts_tmp, NULL);
|
||||||
|
double cts = (cts_tmp.tv_sec) + ((double) cts_tmp.tv_usec) / 1000000.0;
|
||||||
|
#else
|
||||||
|
struct _timeb timebuf;
|
||||||
|
_ftime( &timebuf);
|
||||||
|
double cts = (timebuf.time) + ((double) timebuf.millitm) / 1000.0;
|
||||||
|
#endif
|
||||||
|
return cts;
|
||||||
|
}
|
||||||
|
|
||||||
const int pqihandlerzone = 34283;
|
const int pqihandlerzone = 34283;
|
||||||
|
|
||||||
static const int PQI_HANDLER_NB_PRIORITY_LEVELS = 10 ;
|
static const int PQI_HANDLER_NB_PRIORITY_LEVELS = 10 ;
|
||||||
@ -60,7 +84,7 @@ int pqihandler::tick()
|
|||||||
{
|
{
|
||||||
int moreToTick = 0;
|
int moreToTick = 0;
|
||||||
|
|
||||||
{
|
{
|
||||||
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
|
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
|
||||||
|
|
||||||
// tick all interfaces...
|
// tick all interfaces...
|
||||||
@ -206,10 +230,10 @@ int pqihandler::locked_HandleRsItem(RsItem *item, uint32_t& computed_size)
|
|||||||
{
|
{
|
||||||
computed_size = 0 ;
|
computed_size = 0 ;
|
||||||
std::map<RsPeerId, SearchModule *>::iterator it;
|
std::map<RsPeerId, SearchModule *>::iterator it;
|
||||||
pqioutput(PQL_DEBUG_BASIC, pqihandlerzone,
|
pqioutput(PQL_DEBUG_BASIC, pqihandlerzone,
|
||||||
"pqihandler::HandleRsItem()");
|
"pqihandler::HandleRsItem()");
|
||||||
|
|
||||||
pqioutput(PQL_DEBUG_BASIC, pqihandlerzone,
|
pqioutput(PQL_DEBUG_BASIC, pqihandlerzone,
|
||||||
"pqihandler::HandleRsItem() Sending to One Channel");
|
"pqihandler::HandleRsItem() Sending to One Channel");
|
||||||
#ifdef DEBUG_TICK
|
#ifdef DEBUG_TICK
|
||||||
std::cerr << "pqihandler::HandleRsItem() Sending to One Channel" << std::endl;
|
std::cerr << "pqihandler::HandleRsItem() Sending to One Channel" << std::endl;
|
||||||
@ -245,7 +269,7 @@ int pqihandler::SendRsRawItem(RsRawItem *ns)
|
|||||||
pqioutput(PQL_DEBUG_BASIC, pqihandlerzone, "pqihandler::SendRsRawItem()");
|
pqioutput(PQL_DEBUG_BASIC, pqihandlerzone, "pqihandler::SendRsRawItem()");
|
||||||
|
|
||||||
// directly send item to streamers
|
// directly send item to streamers
|
||||||
|
|
||||||
return queueOutRsItem(ns) ;
|
return queueOutRsItem(ns) ;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -274,7 +298,7 @@ int pqihandler::locked_GetItems()
|
|||||||
SearchModule *mod = (it -> second);
|
SearchModule *mod = (it -> second);
|
||||||
|
|
||||||
// check security... is output allowed.
|
// check security... is output allowed.
|
||||||
if(0 < secpolicy_check((it -> second) -> sp,
|
if(0 < secpolicy_check((it -> second) -> sp,
|
||||||
0, PQI_INCOMING)) // PQI_ITEM_TYPE_ITEM, PQI_INCOMING))
|
0, PQI_INCOMING)) // PQI_ITEM_TYPE_ITEM, PQI_INCOMING))
|
||||||
{
|
{
|
||||||
// if yes... attempt to read.
|
// if yes... attempt to read.
|
||||||
@ -289,19 +313,19 @@ int pqihandler::locked_GetItems()
|
|||||||
std::cerr << std::endl;
|
std::cerr << std::endl;
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef RSITEM_DEBUG
|
#ifdef RSITEM_DEBUG
|
||||||
std::string out;
|
std::string out;
|
||||||
rs_sprintf(out, "pqihandler::GetItems() Incoming Item from: %p\n", mod -> pqi);
|
rs_sprintf(out, "pqihandler::GetItems() Incoming Item from: %p\n", mod -> pqi);
|
||||||
item -> print_string(out);
|
item -> print_string(out);
|
||||||
|
|
||||||
pqioutput(PQL_DEBUG_BASIC,
|
pqioutput(PQL_DEBUG_BASIC,
|
||||||
pqihandlerzone, out);
|
pqihandlerzone, out);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
if (item->PeerId() != (mod->pqi)->PeerId())
|
if (item->PeerId() != (mod->pqi)->PeerId())
|
||||||
{
|
{
|
||||||
/* ERROR */
|
/* ERROR */
|
||||||
pqioutput(PQL_ALERT,
|
pqioutput(PQL_ALERT,
|
||||||
pqihandlerzone, "ERROR PeerIds dont match!");
|
pqihandlerzone, "ERROR PeerIds dont match!");
|
||||||
item->PeerId(mod->pqi->PeerId());
|
item->PeerId(mod->pqi->PeerId());
|
||||||
}
|
}
|
||||||
@ -338,7 +362,7 @@ void pqihandler::locked_SortnStoreItem(RsItem *item)
|
|||||||
/* whole Version reserved for SERVICES/CACHES */
|
/* whole Version reserved for SERVICES/CACHES */
|
||||||
if (vers == RS_PKT_VERSION_SERVICE)
|
if (vers == RS_PKT_VERSION_SERVICE)
|
||||||
{
|
{
|
||||||
pqioutput(PQL_DEBUG_BASIC, pqihandlerzone,
|
pqioutput(PQL_DEBUG_BASIC, pqihandlerzone,
|
||||||
"SortnStore -> Service");
|
"SortnStore -> Service");
|
||||||
in_service.push_back(item);
|
in_service.push_back(item);
|
||||||
item = NULL;
|
item = NULL;
|
||||||
@ -440,10 +464,16 @@ int pqihandler::ExtractRates(std::map<RsPeerId, RsBwRates> &ratemap, RsBwRat
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
// internal fn to send updates
|
// internal fn to send updates
|
||||||
int pqihandler::UpdateRates()
|
int pqihandler::UpdateRates()
|
||||||
{
|
{
|
||||||
|
#define PQI_HDL_DEBUG_UR = 1
|
||||||
|
#ifdef PQI_HDL_DEBUG_UR
|
||||||
|
uint64_t t_now;
|
||||||
|
#endif
|
||||||
|
|
||||||
std::map<RsPeerId, SearchModule *>::iterator it;
|
std::map<RsPeerId, SearchModule *>::iterator it;
|
||||||
|
|
||||||
int num_sm = mods.size();
|
int num_sm = mods.size();
|
||||||
|
|
||||||
float avail_in = getMaxRate(true);
|
float avail_in = getMaxRate(true);
|
||||||
@ -451,32 +481,126 @@ int pqihandler::UpdateRates()
|
|||||||
|
|
||||||
float used_bw_in = 0;
|
float used_bw_in = 0;
|
||||||
float used_bw_out = 0;
|
float used_bw_out = 0;
|
||||||
|
float used_bw_in_table[num_sm]; /* table of in bandwidth currently used by each module */
|
||||||
|
float used_bw_out_table[num_sm]; /* table of out bandwidth currently used by each module */
|
||||||
|
|
||||||
/* Lock once rates have been retrieved */
|
/* Lock once rates have been retrieved */
|
||||||
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
|
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
|
||||||
|
|
||||||
int effectiveUploadsSm = 0;
|
int effectiveUploadsSm = 0;
|
||||||
int effectiveDownloadsSm = 0;
|
int effectiveDownloadsSm = 0;
|
||||||
|
|
||||||
// loop through modules to get the used bandwith and the number of modules that are affectively transfering
|
// loop through modules to get the used bandwith and the number of modules that are affectively transfering
|
||||||
//std::cerr << " Looping through modules" << std::endl;
|
#ifdef PQI_HDL_DEBUG_UR
|
||||||
|
std::cerr << " Looping through modules" << std::endl;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
int index = 0;
|
||||||
|
|
||||||
for(it = mods.begin(); it != mods.end(); ++it)
|
for(it = mods.begin(); it != mods.end(); ++it)
|
||||||
{
|
{
|
||||||
SearchModule *mod = (it -> second);
|
SearchModule *mod = (it -> second);
|
||||||
float crate_in = mod -> pqi -> getRate(true);
|
float crate_in = mod -> pqi -> getRate(true);
|
||||||
if (crate_in > 0.01 * avail_in || crate_in > 0.1)
|
if ((crate_in > 0.01 * avail_in) || (crate_in > 0.1))
|
||||||
{
|
{
|
||||||
++effectiveDownloadsSm;
|
++effectiveDownloadsSm;
|
||||||
}
|
}
|
||||||
|
|
||||||
float crate_out = mod -> pqi -> getRate(false);
|
float crate_out = mod -> pqi -> getRate(false);
|
||||||
if (crate_out > 0.01 * avail_out || crate_out > 0.1)
|
if ((crate_out > 0.01 * avail_out) || (crate_out > 0.1))
|
||||||
{
|
{
|
||||||
++effectiveUploadsSm;
|
++effectiveUploadsSm;
|
||||||
}
|
}
|
||||||
|
|
||||||
used_bw_in += crate_in;
|
used_bw_in += crate_in;
|
||||||
used_bw_out += crate_out;
|
used_bw_out += crate_out;
|
||||||
|
|
||||||
|
/* fill the table of bandwidth */
|
||||||
|
used_bw_in_table[index] = crate_in;
|
||||||
|
used_bw_out_table[index] = crate_out;
|
||||||
|
++index;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifdef PQI_HDL_DEBUG_UR
|
||||||
|
t_now = 1000 * getCurrentTS();
|
||||||
|
std::cerr << dec << t_now << " pqihandler::UpdateRates(): Sorting used_bw_out_table: " << num_sm << " entries" << std::endl;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
/* Sort the used bw in/out table in ascending order */
|
||||||
|
std::sort(used_bw_in_table, used_bw_in_table + num_sm);
|
||||||
|
std::sort(used_bw_out_table, used_bw_out_table + num_sm);
|
||||||
|
|
||||||
|
#ifdef PQI_HDL_DEBUG_UR
|
||||||
|
t_now = 1000 * getCurrentTS();
|
||||||
|
std::cerr << dec << t_now << " pqihandler::UpdateRates(): Done." << std::endl;
|
||||||
|
std::cerr << dec << t_now << " pqihandler::UpdateRates(): used_bw_out " << used_bw_out << std::endl;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
/* Calculate the optimal out_max value, taking into account avail_out and the out bw requested by modules */
|
||||||
|
|
||||||
|
float out_remaining_bw = avail_out;
|
||||||
|
float out_max_bw = 0;
|
||||||
|
bool keep_going = true;
|
||||||
|
int mod_index = 0;
|
||||||
|
|
||||||
|
while (keep_going && (mod_index < num_sm)) {
|
||||||
|
float result = (num_sm - mod_index) * (used_bw_out_table[mod_index] - out_max_bw);
|
||||||
|
if (result > out_remaining_bw) {
|
||||||
|
/* There is not enough remaining out bw to satisfy all modules,
|
||||||
|
distribute the remaining out bw among modules, then exit */
|
||||||
|
out_max_bw += out_remaining_bw / (num_sm - mod_index);
|
||||||
|
out_remaining_bw = 0;
|
||||||
|
keep_going = false;
|
||||||
|
} else {
|
||||||
|
/* Grant the requested out bandwidth to all modules,
|
||||||
|
then recalculate the remaining out bandwidth */
|
||||||
|
out_remaining_bw -= result;
|
||||||
|
out_max_bw = used_bw_out_table[mod_index];
|
||||||
|
++mod_index;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#ifdef PQI_HDL_DEBUG_UR
|
||||||
|
t_now = 1000 * getCurrentTS();
|
||||||
|
std::cerr << dec << t_now << " pqihandler::UpdateRates(): mod_index " << mod_index << " out_max_bw " << out_max_bw << " remaining out bw " << out_remaining_bw << std::endl;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
/* Allocate only half the remaining out bw, if any, to make it smoother */
|
||||||
|
out_max_bw = out_max_bw + out_remaining_bw / 2;
|
||||||
|
|
||||||
|
/* Calculate the optimal in_max value, taking into account avail_in and the in bw requested by modules */
|
||||||
|
|
||||||
|
float in_remaining_bw = avail_in;
|
||||||
|
float in_max_bw = 0;
|
||||||
|
keep_going = true;
|
||||||
|
mod_index = 0;
|
||||||
|
|
||||||
|
while (keep_going && mod_index < num_sm) {
|
||||||
|
float result = (num_sm - mod_index) * (used_bw_in_table[mod_index] - in_max_bw);
|
||||||
|
if (result > in_remaining_bw) {
|
||||||
|
/* There is not enough remaining in bw to satisfy all modules,
|
||||||
|
distribute the remaining in bw among modules, then exit */
|
||||||
|
in_max_bw += in_remaining_bw / (num_sm - mod_index);
|
||||||
|
in_remaining_bw = 0;
|
||||||
|
keep_going = false;
|
||||||
|
} else {
|
||||||
|
/* Grant the requested in bandwidth to all modules,
|
||||||
|
then recalculate the remaining in bandwidth */
|
||||||
|
in_remaining_bw -= result;
|
||||||
|
in_max_bw = used_bw_in_table[mod_index];
|
||||||
|
++mod_index;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#ifdef PQI_HDL_DEBUG_UR
|
||||||
|
t_now = 1000 * getCurrentTS();
|
||||||
|
std::cerr << dec << t_now << " pqihandler::UpdateRates(): mod_index " << mod_index << " in_max_bw " << in_max_bw << " remaining in bw " << in_remaining_bw << std::endl;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
/* Allocate only half the remaining in bw, if any, to make it smoother */
|
||||||
|
in_max_bw = in_max_bw + in_remaining_bw / 2;
|
||||||
|
|
||||||
|
|
||||||
#ifdef DEBUG_QOS
|
#ifdef DEBUG_QOS
|
||||||
// std::cerr << "Totals (In) Used B/W " << used_bw_in;
|
// std::cerr << "Totals (In) Used B/W " << used_bw_in;
|
||||||
// std::cerr << " Available B/W " << avail_in;
|
// std::cerr << " Available B/W " << avail_in;
|
||||||
@ -498,51 +622,35 @@ int pqihandler::UpdateRates()
|
|||||||
max_out_effective = avail_out / effectiveUploadsSm;
|
max_out_effective = avail_out / effectiveUploadsSm;
|
||||||
}
|
}
|
||||||
|
|
||||||
//modify the outgoing rates if bandwith is not used well
|
//modify the in and out limit
|
||||||
float rate_out_modifier = 0;
|
#ifdef PQI_HDL_DEBUG_UR
|
||||||
if (used_bw_out / avail_out < 0.95) {
|
t_now = 1000 * getCurrentTS();
|
||||||
rate_out_modifier = 0.001 * avail_out;
|
std::cerr << dec << t_now << " pqihandler::UpdateRates(): setting new out_max " << out_max_bw << " in_max " << in_max_bw << std::endl;
|
||||||
} else if (used_bw_out / avail_out > 1.05) {
|
#endif
|
||||||
rate_out_modifier = - 0.001 * avail_out;
|
|
||||||
}
|
for(it = mods.begin(); it != mods.end(); ++it)
|
||||||
if (rate_out_modifier != 0) {
|
{
|
||||||
for(it = mods.begin(); it != mods.end(); ++it)
|
SearchModule *mod = (it -> second);
|
||||||
{
|
mod -> pqi -> setMaxRate(true, in_max_bw);
|
||||||
SearchModule *mod = (it -> second);
|
mod -> pqi -> setMaxRate(false, out_max_bw);
|
||||||
mod -> pqi -> setMaxRate(false, mod -> pqi -> getMaxRate(false) + rate_out_modifier);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//modify the incoming rates if bandwith is not used well
|
|
||||||
float rate_in_modifier = 0;
|
|
||||||
if (used_bw_in / avail_in < 0.95) {
|
|
||||||
rate_in_modifier = 0.001 * avail_in;
|
|
||||||
} else if (used_bw_in / avail_in > 1.05) {
|
|
||||||
rate_in_modifier = - 0.001 * avail_in;
|
|
||||||
}
|
|
||||||
if (rate_in_modifier != 0) {
|
|
||||||
for(it = mods.begin(); it != mods.end(); ++it)
|
|
||||||
{
|
|
||||||
SearchModule *mod = (it -> second);
|
|
||||||
mod -> pqi -> setMaxRate(true, mod -> pqi -> getMaxRate(true) + rate_in_modifier);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//cap the rates
|
//cap the rates
|
||||||
for(it = mods.begin(); it != mods.end(); ++it)
|
for(it = mods.begin(); it != mods.end(); ++it)
|
||||||
{
|
{
|
||||||
SearchModule *mod = (it -> second);
|
SearchModule *mod = (it -> second);
|
||||||
if (mod -> pqi -> getMaxRate(false) < max_out_effective) {
|
if (mod -> pqi -> getMaxRate(false) < max_out_effective) {
|
||||||
mod -> pqi -> setMaxRate(false, max_out_effective);
|
mod -> pqi -> setMaxRate(false, max_out_effective);
|
||||||
}
|
}
|
||||||
if (mod -> pqi -> getMaxRate(false) > avail_out) {
|
if (mod -> pqi -> getMaxRate(false) > avail_out) {
|
||||||
mod -> pqi -> setMaxRate(false, avail_out);
|
mod -> pqi -> setMaxRate(false, avail_out);
|
||||||
}
|
}
|
||||||
if (mod -> pqi -> getMaxRate(true) < max_in_effective) {
|
if (mod -> pqi -> getMaxRate(true) < max_in_effective) {
|
||||||
mod -> pqi -> setMaxRate(true, max_in_effective);
|
mod -> pqi -> setMaxRate(true, max_in_effective);
|
||||||
}
|
}
|
||||||
if (mod -> pqi -> getMaxRate(true) > avail_in) {
|
if (mod -> pqi -> getMaxRate(true) > avail_in) {
|
||||||
mod -> pqi -> setMaxRate(true, avail_in);
|
mod -> pqi -> setMaxRate(true, avail_in);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user