From 4521e8ee95ca52ede79b833bbd82249b1666a79a Mon Sep 17 00:00:00 2001
From: Phenom <retrosharephenom@gmail.com>
Date: Wed, 23 Dec 2015 15:49:05 +0100
Subject: [PATCH] Fix pqihandler::UpdateRates() to get more accurate.

---
 libretroshare/src/pqi/pqihandler.cc | 196 +++++++++++++++++++++-------
 1 file changed, 152 insertions(+), 44 deletions(-)

diff --git a/libretroshare/src/pqi/pqihandler.cc b/libretroshare/src/pqi/pqihandler.cc
index 60f23c914..ccb826cd3 100644
--- a/libretroshare/src/pqi/pqihandler.cc
+++ b/libretroshare/src/pqi/pqihandler.cc
@@ -29,6 +29,30 @@
 #include "util/rsstring.h"
 #include <stdlib.h>
 #include <time.h>
+
+using std::dec;
+
+#include <time.h>
+#include <sys/time.h>
+#ifdef WINDOWS_SYS
+#include <sys/timeb.h>
+#endif
+
+static double getCurrentTS()
+{
+
+#ifndef WINDOWS_SYS
+        struct timeval cts_tmp;
+        gettimeofday(&cts_tmp, NULL);
+        double cts =  (cts_tmp.tv_sec) + ((double) cts_tmp.tv_usec) / 1000000.0;
+#else
+        struct _timeb timebuf;
+        _ftime( &timebuf);
+        double cts =  (timebuf.time) + ((double) timebuf.millitm) / 1000.0;
+#endif
+        return cts;
+}
+
 const int pqihandlerzone = 34283;
 
 static const int PQI_HANDLER_NB_PRIORITY_LEVELS = 10 ;
@@ -60,7 +84,7 @@ int	pqihandler::tick()
 {
 	int moreToTick = 0;
 
-	{ 
+	{
 		RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
 
 		// tick all interfaces...
@@ -206,10 +230,10 @@ int	pqihandler::locked_HandleRsItem(RsItem *item, uint32_t& computed_size)
 {
 	computed_size = 0 ;
 	std::map<RsPeerId, SearchModule *>::iterator it;
-	pqioutput(PQL_DEBUG_BASIC, pqihandlerzone, 
+	pqioutput(PQL_DEBUG_BASIC, pqihandlerzone,
 	  "pqihandler::HandleRsItem()");
 
-	pqioutput(PQL_DEBUG_BASIC, pqihandlerzone, 
+	pqioutput(PQL_DEBUG_BASIC, pqihandlerzone,
 	  "pqihandler::HandleRsItem() Sending to One Channel");
 #ifdef DEBUG_TICK
         std::cerr << "pqihandler::HandleRsItem() Sending to One Channel" << std::endl;
@@ -245,7 +269,7 @@ int     pqihandler::SendRsRawItem(RsRawItem *ns)
 	pqioutput(PQL_DEBUG_BASIC, pqihandlerzone, "pqihandler::SendRsRawItem()");
 
 	// directly send item to streamers
-	
+
 	return queueOutRsItem(ns) ;
 }
 
@@ -274,7 +298,7 @@ int pqihandler::locked_GetItems()
 		SearchModule *mod = (it -> second);
 
 		// check security... is output allowed.
-		if(0 < secpolicy_check((it -> second) -> sp, 
+		if(0 < secpolicy_check((it -> second) -> sp,
 					0, PQI_INCOMING)) // PQI_ITEM_TYPE_ITEM, PQI_INCOMING))
 		{
 			// if yes... attempt to read.
@@ -289,19 +313,19 @@ int pqihandler::locked_GetItems()
         std::cerr << std::endl;
 				}
 
-#ifdef RSITEM_DEBUG 
+#ifdef RSITEM_DEBUG
 				std::string out;
 				rs_sprintf(out, "pqihandler::GetItems() Incoming Item from: %p\n", mod -> pqi);
 				item -> print_string(out);
 
-				pqioutput(PQL_DEBUG_BASIC, 
+				pqioutput(PQL_DEBUG_BASIC,
 						pqihandlerzone, out);
 #endif
 
 				if (item->PeerId() != (mod->pqi)->PeerId())
 				{
 					/* ERROR */
-					pqioutput(PQL_ALERT, 
+					pqioutput(PQL_ALERT,
 						pqihandlerzone, "ERROR PeerIds dont match!");
 					item->PeerId(mod->pqi->PeerId());
 				}
@@ -338,7 +362,7 @@ void pqihandler::locked_SortnStoreItem(RsItem *item)
 	/* whole Version reserved for SERVICES/CACHES */
 	if (vers == RS_PKT_VERSION_SERVICE)
 	{
-	    pqioutput(PQL_DEBUG_BASIC, pqihandlerzone, 
+	    pqioutput(PQL_DEBUG_BASIC, pqihandlerzone,
 	      "SortnStore -> Service");
 	    in_service.push_back(item);
 	    item = NULL;
@@ -440,10 +464,16 @@ int     pqihandler::ExtractRates(std::map<RsPeerId, RsBwRates> &ratemap, RsBwRat
 
 
 
-// internal fn to send updates 
+// internal fn to send updates
 int     pqihandler::UpdateRates()
 {
+#define PQI_HDL_DEBUG_UR = 1
+#ifdef PQI_HDL_DEBUG_UR
+	uint64_t t_now;
+#endif
+
 	std::map<RsPeerId, SearchModule *>::iterator it;
+
 	int num_sm = mods.size();
 
 	float avail_in = getMaxRate(true);
@@ -451,32 +481,126 @@ int     pqihandler::UpdateRates()
 
 	float used_bw_in = 0;
 	float used_bw_out = 0;
+	float used_bw_in_table[num_sm];         /* table of in bandwidth currently used by each module */
+	float used_bw_out_table[num_sm];        /* table of out bandwidth currently used by each module */
 
 	/* Lock once rates have been retrieved */
 	RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
 
 	int effectiveUploadsSm = 0;
 	int effectiveDownloadsSm = 0;
+
 	// loop through modules to get the used bandwith and the number of modules that are affectively transfering
-	//std::cerr << " Looping through modules" << std::endl;
+#ifdef PQI_HDL_DEBUG_UR
+	std::cerr << " Looping through modules" << std::endl;
+#endif
+
+	int index = 0;
+
 	for(it = mods.begin(); it != mods.end(); ++it)
 	{
 		SearchModule *mod = (it -> second);
 		float crate_in = mod -> pqi -> getRate(true);
-		if (crate_in > 0.01 * avail_in || crate_in > 0.1)
+		if ((crate_in > 0.01 * avail_in) || (crate_in > 0.1))
 		{
 			++effectiveDownloadsSm;
 		}
 
 		float crate_out = mod -> pqi -> getRate(false);
-		if (crate_out > 0.01 * avail_out || crate_out > 0.1)
+		if ((crate_out > 0.01 * avail_out) || (crate_out > 0.1))
 		{
 			++effectiveUploadsSm;
 		}
 
 		used_bw_in += crate_in;
 		used_bw_out += crate_out;
+
+		/* fill the table of bandwidth */
+		used_bw_in_table[index] = crate_in;
+		used_bw_out_table[index] = crate_out;
+		++index;
 	}
+
+#ifdef PQI_HDL_DEBUG_UR
+	t_now = 1000 * getCurrentTS();
+	std::cerr << dec << t_now << " pqihandler::UpdateRates(): Sorting used_bw_out_table: " << num_sm << " entries" << std::endl;
+#endif
+
+	/* Sort the used bw in/out table in ascending order */
+	std::sort(used_bw_in_table, used_bw_in_table + num_sm);
+	std::sort(used_bw_out_table, used_bw_out_table + num_sm);
+
+#ifdef PQI_HDL_DEBUG_UR
+	t_now = 1000 * getCurrentTS();
+	std::cerr << dec << t_now << " pqihandler::UpdateRates(): Done." << std::endl;
+	std::cerr << dec << t_now << " pqihandler::UpdateRates(): used_bw_out " << used_bw_out << std::endl;
+#endif
+
+	/* Calculate the optimal out_max value, taking into account avail_out and the out bw requested by modules */
+
+	float out_remaining_bw = avail_out;
+	float out_max_bw = 0;
+	bool keep_going = true;
+	int mod_index = 0;
+
+	while (keep_going && (mod_index < num_sm)) {
+		float result = (num_sm - mod_index) * (used_bw_out_table[mod_index] - out_max_bw);
+		if (result > out_remaining_bw) {
+			/* There is not enough remaining out bw to satisfy all modules,
+			   distribute the remaining out bw among modules, then exit */
+			out_max_bw += out_remaining_bw / (num_sm - mod_index);
+			out_remaining_bw = 0;
+			keep_going = false;
+		} else {
+			/* Grant the requested out bandwidth to all modules,
+			   then recalculate the remaining out bandwidth */
+			out_remaining_bw -= result;
+			out_max_bw = used_bw_out_table[mod_index];
+			++mod_index;
+		}
+	}
+
+#ifdef PQI_HDL_DEBUG_UR
+	t_now = 1000 * getCurrentTS();
+	std::cerr << dec << t_now << " pqihandler::UpdateRates(): mod_index " << mod_index << " out_max_bw " << out_max_bw << " remaining out bw " << out_remaining_bw << std::endl;
+#endif
+
+	/* Allocate only half the remaining out bw, if any, to make it smoother */
+	out_max_bw = out_max_bw + out_remaining_bw / 2;
+
+	/* Calculate the optimal in_max value, taking into account avail_in and the in bw requested by modules */
+
+	float in_remaining_bw = avail_in;
+	float in_max_bw = 0;
+	keep_going = true;
+	mod_index = 0;
+
+	while (keep_going && mod_index < num_sm) {
+		float result = (num_sm - mod_index) * (used_bw_in_table[mod_index] - in_max_bw);
+		if (result > in_remaining_bw) {
+			/* There is not enough remaining in bw to satisfy all modules,
+			   distribute the remaining in bw among modules, then exit */
+			in_max_bw += in_remaining_bw / (num_sm - mod_index);
+			in_remaining_bw = 0;
+			keep_going = false;
+		} else {
+			/* Grant the requested in bandwidth to all modules,
+			   then recalculate the remaining in bandwidth */
+			in_remaining_bw -= result;
+			in_max_bw = used_bw_in_table[mod_index];
+			++mod_index;
+		}
+	}
+
+#ifdef PQI_HDL_DEBUG_UR
+	t_now = 1000 * getCurrentTS();
+	std::cerr << dec << t_now << " pqihandler::UpdateRates(): mod_index " << mod_index << " in_max_bw " << in_max_bw << " remaining in bw " << in_remaining_bw << std::endl;
+#endif
+
+	/* Allocate only half the remaining in bw, if any, to make it smoother */
+	in_max_bw = in_max_bw + in_remaining_bw / 2;
+
+
 #ifdef DEBUG_QOS
 //	std::cerr << "Totals (In) Used B/W " << used_bw_in;
 //	std::cerr << " Available B/W " << avail_in;
@@ -498,51 +622,35 @@ int     pqihandler::UpdateRates()
 	    max_out_effective = avail_out / effectiveUploadsSm;
 	}
 
-	//modify the outgoing rates if bandwith is not used well
-	float rate_out_modifier = 0;
-	if (used_bw_out / avail_out < 0.95) {
-	    rate_out_modifier = 0.001 * avail_out;
-	} else 	if (used_bw_out / avail_out > 1.05) {
-	    rate_out_modifier = - 0.001 * avail_out;
-	}
-	if (rate_out_modifier != 0) {
-	    for(it = mods.begin(); it != mods.end(); ++it)
-	    {
-		    SearchModule *mod = (it -> second);
-			mod -> pqi -> setMaxRate(false, mod -> pqi -> getMaxRate(false) + rate_out_modifier);
-	    }
+	//modify the in and out limit
+#ifdef PQI_HDL_DEBUG_UR
+	t_now = 1000 * getCurrentTS();
+	std::cerr << dec << t_now << " pqihandler::UpdateRates(): setting new out_max " << out_max_bw << " in_max " << in_max_bw << std::endl;
+#endif
+
+	for(it = mods.begin(); it != mods.end(); ++it)
+	{
+		SearchModule *mod = (it -> second);
+		mod -> pqi -> setMaxRate(true,   in_max_bw);
+		mod -> pqi -> setMaxRate(false, out_max_bw);
 	}
 
-	//modify the incoming rates if bandwith is not used well
-	float rate_in_modifier = 0;
-	if (used_bw_in / avail_in < 0.95) {
-	    rate_in_modifier = 0.001 * avail_in;
-	} else 	if (used_bw_in / avail_in > 1.05) {
-	    rate_in_modifier = - 0.001 * avail_in;
-	}
-	if (rate_in_modifier != 0) {
-	    for(it = mods.begin(); it != mods.end(); ++it)
-	    {
-		    SearchModule *mod = (it -> second);
-			mod -> pqi -> setMaxRate(true, mod -> pqi -> getMaxRate(true) + rate_in_modifier);
-	    }
-	}
 
 	//cap the rates
 	for(it = mods.begin(); it != mods.end(); ++it)
 	{
 		SearchModule *mod = (it -> second);
 		if (mod -> pqi -> getMaxRate(false) < max_out_effective) {
-		    mod -> pqi -> setMaxRate(false, max_out_effective);
+			mod -> pqi -> setMaxRate(false, max_out_effective);
 		}
 		if (mod -> pqi -> getMaxRate(false) > avail_out) {
-		    mod -> pqi -> setMaxRate(false, avail_out);
+			mod -> pqi -> setMaxRate(false, avail_out);
 		}
 		if (mod -> pqi -> getMaxRate(true) < max_in_effective) {
-		    mod -> pqi -> setMaxRate(true, max_in_effective);
+			mod -> pqi -> setMaxRate(true, max_in_effective);
 		}
 		if (mod -> pqi -> getMaxRate(true) > avail_in) {
-		    mod -> pqi -> setMaxRate(true, avail_in);
+			mod -> pqi -> setMaxRate(true, avail_in);
 		}
 	}