diff --git a/libretroshare/src/pqi/pqihandler.cc b/libretroshare/src/pqi/pqihandler.cc index 60f23c914..ccb826cd3 100644 --- a/libretroshare/src/pqi/pqihandler.cc +++ b/libretroshare/src/pqi/pqihandler.cc @@ -29,6 +29,30 @@ #include "util/rsstring.h" #include #include + +using std::dec; + +#include +#include +#ifdef WINDOWS_SYS +#include +#endif + +static double getCurrentTS() +{ + +#ifndef WINDOWS_SYS + struct timeval cts_tmp; + gettimeofday(&cts_tmp, NULL); + double cts = (cts_tmp.tv_sec) + ((double) cts_tmp.tv_usec) / 1000000.0; +#else + struct _timeb timebuf; + _ftime( &timebuf); + double cts = (timebuf.time) + ((double) timebuf.millitm) / 1000.0; +#endif + return cts; +} + const int pqihandlerzone = 34283; static const int PQI_HANDLER_NB_PRIORITY_LEVELS = 10 ; @@ -60,7 +84,7 @@ int pqihandler::tick() { int moreToTick = 0; - { + { RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/ // tick all interfaces... @@ -206,10 +230,10 @@ int pqihandler::locked_HandleRsItem(RsItem *item, uint32_t& computed_size) { computed_size = 0 ; std::map::iterator it; - pqioutput(PQL_DEBUG_BASIC, pqihandlerzone, + pqioutput(PQL_DEBUG_BASIC, pqihandlerzone, "pqihandler::HandleRsItem()"); - pqioutput(PQL_DEBUG_BASIC, pqihandlerzone, + pqioutput(PQL_DEBUG_BASIC, pqihandlerzone, "pqihandler::HandleRsItem() Sending to One Channel"); #ifdef DEBUG_TICK std::cerr << "pqihandler::HandleRsItem() Sending to One Channel" << std::endl; @@ -245,7 +269,7 @@ int pqihandler::SendRsRawItem(RsRawItem *ns) pqioutput(PQL_DEBUG_BASIC, pqihandlerzone, "pqihandler::SendRsRawItem()"); // directly send item to streamers - + return queueOutRsItem(ns) ; } @@ -274,7 +298,7 @@ int pqihandler::locked_GetItems() SearchModule *mod = (it -> second); // check security... is output allowed. - if(0 < secpolicy_check((it -> second) -> sp, + if(0 < secpolicy_check((it -> second) -> sp, 0, PQI_INCOMING)) // PQI_ITEM_TYPE_ITEM, PQI_INCOMING)) { // if yes... attempt to read. @@ -289,19 +313,19 @@ int pqihandler::locked_GetItems() std::cerr << std::endl; } -#ifdef RSITEM_DEBUG +#ifdef RSITEM_DEBUG std::string out; rs_sprintf(out, "pqihandler::GetItems() Incoming Item from: %p\n", mod -> pqi); item -> print_string(out); - pqioutput(PQL_DEBUG_BASIC, + pqioutput(PQL_DEBUG_BASIC, pqihandlerzone, out); #endif if (item->PeerId() != (mod->pqi)->PeerId()) { /* ERROR */ - pqioutput(PQL_ALERT, + pqioutput(PQL_ALERT, pqihandlerzone, "ERROR PeerIds dont match!"); item->PeerId(mod->pqi->PeerId()); } @@ -338,7 +362,7 @@ void pqihandler::locked_SortnStoreItem(RsItem *item) /* whole Version reserved for SERVICES/CACHES */ if (vers == RS_PKT_VERSION_SERVICE) { - pqioutput(PQL_DEBUG_BASIC, pqihandlerzone, + pqioutput(PQL_DEBUG_BASIC, pqihandlerzone, "SortnStore -> Service"); in_service.push_back(item); item = NULL; @@ -440,10 +464,16 @@ int pqihandler::ExtractRates(std::map &ratemap, RsBwRat -// internal fn to send updates +// internal fn to send updates int pqihandler::UpdateRates() { +#define PQI_HDL_DEBUG_UR = 1 +#ifdef PQI_HDL_DEBUG_UR + uint64_t t_now; +#endif + std::map::iterator it; + int num_sm = mods.size(); float avail_in = getMaxRate(true); @@ -451,32 +481,126 @@ int pqihandler::UpdateRates() float used_bw_in = 0; float used_bw_out = 0; + float used_bw_in_table[num_sm]; /* table of in bandwidth currently used by each module */ + float used_bw_out_table[num_sm]; /* table of out bandwidth currently used by each module */ /* Lock once rates have been retrieved */ RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/ int effectiveUploadsSm = 0; int effectiveDownloadsSm = 0; + // loop through modules to get the used bandwith and the number of modules that are affectively transfering - //std::cerr << " Looping through modules" << std::endl; +#ifdef PQI_HDL_DEBUG_UR + std::cerr << " Looping through modules" << std::endl; +#endif + + int index = 0; + for(it = mods.begin(); it != mods.end(); ++it) { SearchModule *mod = (it -> second); float crate_in = mod -> pqi -> getRate(true); - if (crate_in > 0.01 * avail_in || crate_in > 0.1) + if ((crate_in > 0.01 * avail_in) || (crate_in > 0.1)) { ++effectiveDownloadsSm; } float crate_out = mod -> pqi -> getRate(false); - if (crate_out > 0.01 * avail_out || crate_out > 0.1) + if ((crate_out > 0.01 * avail_out) || (crate_out > 0.1)) { ++effectiveUploadsSm; } used_bw_in += crate_in; used_bw_out += crate_out; + + /* fill the table of bandwidth */ + used_bw_in_table[index] = crate_in; + used_bw_out_table[index] = crate_out; + ++index; } + +#ifdef PQI_HDL_DEBUG_UR + t_now = 1000 * getCurrentTS(); + std::cerr << dec << t_now << " pqihandler::UpdateRates(): Sorting used_bw_out_table: " << num_sm << " entries" << std::endl; +#endif + + /* Sort the used bw in/out table in ascending order */ + std::sort(used_bw_in_table, used_bw_in_table + num_sm); + std::sort(used_bw_out_table, used_bw_out_table + num_sm); + +#ifdef PQI_HDL_DEBUG_UR + t_now = 1000 * getCurrentTS(); + std::cerr << dec << t_now << " pqihandler::UpdateRates(): Done." << std::endl; + std::cerr << dec << t_now << " pqihandler::UpdateRates(): used_bw_out " << used_bw_out << std::endl; +#endif + + /* Calculate the optimal out_max value, taking into account avail_out and the out bw requested by modules */ + + float out_remaining_bw = avail_out; + float out_max_bw = 0; + bool keep_going = true; + int mod_index = 0; + + while (keep_going && (mod_index < num_sm)) { + float result = (num_sm - mod_index) * (used_bw_out_table[mod_index] - out_max_bw); + if (result > out_remaining_bw) { + /* There is not enough remaining out bw to satisfy all modules, + distribute the remaining out bw among modules, then exit */ + out_max_bw += out_remaining_bw / (num_sm - mod_index); + out_remaining_bw = 0; + keep_going = false; + } else { + /* Grant the requested out bandwidth to all modules, + then recalculate the remaining out bandwidth */ + out_remaining_bw -= result; + out_max_bw = used_bw_out_table[mod_index]; + ++mod_index; + } + } + +#ifdef PQI_HDL_DEBUG_UR + t_now = 1000 * getCurrentTS(); + std::cerr << dec << t_now << " pqihandler::UpdateRates(): mod_index " << mod_index << " out_max_bw " << out_max_bw << " remaining out bw " << out_remaining_bw << std::endl; +#endif + + /* Allocate only half the remaining out bw, if any, to make it smoother */ + out_max_bw = out_max_bw + out_remaining_bw / 2; + + /* Calculate the optimal in_max value, taking into account avail_in and the in bw requested by modules */ + + float in_remaining_bw = avail_in; + float in_max_bw = 0; + keep_going = true; + mod_index = 0; + + while (keep_going && mod_index < num_sm) { + float result = (num_sm - mod_index) * (used_bw_in_table[mod_index] - in_max_bw); + if (result > in_remaining_bw) { + /* There is not enough remaining in bw to satisfy all modules, + distribute the remaining in bw among modules, then exit */ + in_max_bw += in_remaining_bw / (num_sm - mod_index); + in_remaining_bw = 0; + keep_going = false; + } else { + /* Grant the requested in bandwidth to all modules, + then recalculate the remaining in bandwidth */ + in_remaining_bw -= result; + in_max_bw = used_bw_in_table[mod_index]; + ++mod_index; + } + } + +#ifdef PQI_HDL_DEBUG_UR + t_now = 1000 * getCurrentTS(); + std::cerr << dec << t_now << " pqihandler::UpdateRates(): mod_index " << mod_index << " in_max_bw " << in_max_bw << " remaining in bw " << in_remaining_bw << std::endl; +#endif + + /* Allocate only half the remaining in bw, if any, to make it smoother */ + in_max_bw = in_max_bw + in_remaining_bw / 2; + + #ifdef DEBUG_QOS // std::cerr << "Totals (In) Used B/W " << used_bw_in; // std::cerr << " Available B/W " << avail_in; @@ -498,51 +622,35 @@ int pqihandler::UpdateRates() max_out_effective = avail_out / effectiveUploadsSm; } - //modify the outgoing rates if bandwith is not used well - float rate_out_modifier = 0; - if (used_bw_out / avail_out < 0.95) { - rate_out_modifier = 0.001 * avail_out; - } else if (used_bw_out / avail_out > 1.05) { - rate_out_modifier = - 0.001 * avail_out; - } - if (rate_out_modifier != 0) { - for(it = mods.begin(); it != mods.end(); ++it) - { - SearchModule *mod = (it -> second); - mod -> pqi -> setMaxRate(false, mod -> pqi -> getMaxRate(false) + rate_out_modifier); - } + //modify the in and out limit +#ifdef PQI_HDL_DEBUG_UR + t_now = 1000 * getCurrentTS(); + std::cerr << dec << t_now << " pqihandler::UpdateRates(): setting new out_max " << out_max_bw << " in_max " << in_max_bw << std::endl; +#endif + + for(it = mods.begin(); it != mods.end(); ++it) + { + SearchModule *mod = (it -> second); + mod -> pqi -> setMaxRate(true, in_max_bw); + mod -> pqi -> setMaxRate(false, out_max_bw); } - //modify the incoming rates if bandwith is not used well - float rate_in_modifier = 0; - if (used_bw_in / avail_in < 0.95) { - rate_in_modifier = 0.001 * avail_in; - } else if (used_bw_in / avail_in > 1.05) { - rate_in_modifier = - 0.001 * avail_in; - } - if (rate_in_modifier != 0) { - for(it = mods.begin(); it != mods.end(); ++it) - { - SearchModule *mod = (it -> second); - mod -> pqi -> setMaxRate(true, mod -> pqi -> getMaxRate(true) + rate_in_modifier); - } - } //cap the rates for(it = mods.begin(); it != mods.end(); ++it) { SearchModule *mod = (it -> second); if (mod -> pqi -> getMaxRate(false) < max_out_effective) { - mod -> pqi -> setMaxRate(false, max_out_effective); + mod -> pqi -> setMaxRate(false, max_out_effective); } if (mod -> pqi -> getMaxRate(false) > avail_out) { - mod -> pqi -> setMaxRate(false, avail_out); + mod -> pqi -> setMaxRate(false, avail_out); } if (mod -> pqi -> getMaxRate(true) < max_in_effective) { - mod -> pqi -> setMaxRate(true, max_in_effective); + mod -> pqi -> setMaxRate(true, max_in_effective); } if (mod -> pqi -> getMaxRate(true) > avail_in) { - mod -> pqi -> setMaxRate(true, avail_in); + mod -> pqi -> setMaxRate(true, avail_in); } }