mirror of
https://github.com/RetroShare/RetroShare.git
synced 2025-03-01 11:21:25 -05:00
First version of QoS.
- QoS is handled by pqihandler, as a subclass of pqiqos - test program in libretroshare/src/tests/pqi/pqiqos_test.cc - RsItem now has an integer priority value QoS is driven by two parameters: - max number of priority levels - average speed ratio between two successive levels The current algorithm ensures that: - each item is treated within a constant time (which depends on its priority), whatever the order of feeding the queue - items of equal priority are treated in the same order - if kept unfed, the queue eventually gets empty. For now the ratio is 2.8 for a total of 10 priorities. All items have default priority 5, except chat items that have priority 7 for testing. Todo - adapt the speed of drawing from thepriority queue to the available out rate of pqistreamer - remove data queue from pqistreamer - setup priority values for all services. git-svn-id: http://svn.code.sf.net/p/retroshare/code/branches/v0.5_QoS@4511 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
parent
7a316eb8dd
commit
2832f912c6
@ -344,6 +344,7 @@ HEADERS += pqi/authssl.h \
|
||||
pqi/p3dhtmgr.h \
|
||||
pqi/p3notify.h \
|
||||
pqi/p3upnpmgr.h \
|
||||
pqi/pqiqos.h \
|
||||
pqi/pqi.h \
|
||||
pqi/pqi_base.h \
|
||||
pqi/pqiarchive.h \
|
||||
@ -467,6 +468,7 @@ SOURCES += pqi/authgpg.cc \
|
||||
pqi/p3netmgr.cc \
|
||||
pqi/p3dhtmgr.cc \
|
||||
pqi/p3notify.cc \
|
||||
pqi/pqiqos.cc \
|
||||
pqi/pqiarchive.cc \
|
||||
pqi/pqibin.cc \
|
||||
pqi/pqihandler.cc \
|
||||
|
@ -110,38 +110,46 @@ class NetInterface;
|
||||
**/
|
||||
class PQInterface: public RateInterface
|
||||
{
|
||||
public:
|
||||
PQInterface(const std::string &id) :peerId(id) { return; }
|
||||
virtual ~PQInterface() { return; }
|
||||
public:
|
||||
PQInterface(const std::string &id) :peerId(id) { return; }
|
||||
virtual ~PQInterface() { return; }
|
||||
|
||||
/*!
|
||||
* allows user to send RsItems to a particular facility (file, network)
|
||||
*/
|
||||
virtual int SendItem(RsItem *) = 0;
|
||||
/*!
|
||||
* allows user to send RsItems to a particular facility (file, network)
|
||||
*/
|
||||
virtual int SendItem(RsItem *) = 0;
|
||||
|
||||
/*!
|
||||
* Retrieve RsItem from a facility
|
||||
*/
|
||||
virtual RsItem *GetItem() = 0;
|
||||
// this function is overloaded in classes that need the serilized size to be returned.
|
||||
virtual int SendItem(RsItem *item,uint32_t& size)
|
||||
{
|
||||
size = 0 ;
|
||||
std::cerr << "Warning: PQInterface::SendItem(RsItem*,uint32_t&) calledbut not overloaded! Serialized size will not be returned." << std::endl;
|
||||
return SendItem(item) ;
|
||||
}
|
||||
|
||||
/**
|
||||
* also there are tick + person id functions.
|
||||
*/
|
||||
virtual int tick() { return 0; }
|
||||
virtual int status() { return 0; }
|
||||
virtual std::string PeerId() { return peerId; }
|
||||
/*!
|
||||
* Retrieve RsItem from a facility
|
||||
*/
|
||||
virtual RsItem *GetItem() = 0;
|
||||
|
||||
// the callback from NetInterface Connection Events.
|
||||
virtual int notifyEvent(NetInterface *ni, int event)
|
||||
{
|
||||
(void) ni; /* remove unused parameter warnings */
|
||||
(void) event; /* remove unused parameter warnings */
|
||||
return 0;
|
||||
}
|
||||
/**
|
||||
* also there are tick + person id functions.
|
||||
*/
|
||||
virtual int tick() { return 0; }
|
||||
virtual int status() { return 0; }
|
||||
virtual std::string PeerId() { return peerId; }
|
||||
|
||||
// the callback from NetInterface Connection Events.
|
||||
virtual int notifyEvent(NetInterface *ni, int event)
|
||||
{
|
||||
(void) ni; /* remove unused parameter warnings */
|
||||
(void) event; /* remove unused parameter warnings */
|
||||
return 0;
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
std::string peerId;
|
||||
std::string peerId;
|
||||
};
|
||||
|
||||
|
||||
|
@ -33,12 +33,16 @@
|
||||
#include <stdlib.h>
|
||||
const int pqihandlerzone = 34283;
|
||||
|
||||
static const int PQI_HANDLER_NB_PRIORITY_LEVELS = 10 ;
|
||||
static const float PQI_HANDLER_NB_PRIORITY_RATIO = 2 ;
|
||||
|
||||
/****
|
||||
#define DEBUG_TICK 1
|
||||
#define RSITEM_DEBUG 1
|
||||
****/
|
||||
#define DEBUG_QOS 1
|
||||
|
||||
pqihandler::pqihandler(SecurityPolicy *Global) : coreMtx("pqihandler")
|
||||
pqihandler::pqihandler(SecurityPolicy *Global) : pqiQoS(PQI_HANDLER_NB_PRIORITY_LEVELS,PQI_HANDLER_NB_PRIORITY_RATIO),coreMtx("pqihandler")
|
||||
{
|
||||
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
|
||||
|
||||
@ -60,6 +64,9 @@ pqihandler::pqihandler(SecurityPolicy *Global) : coreMtx("pqihandler")
|
||||
rateIndiv_in = 0.01;
|
||||
rateMax_out = 0.01;
|
||||
rateMax_in = 0.01;
|
||||
last_m = time(NULL) ;
|
||||
nb_ticks = 0 ;
|
||||
ticks_per_sec = 5 ; // initial guess
|
||||
return;
|
||||
}
|
||||
|
||||
@ -67,34 +74,85 @@ int pqihandler::tick()
|
||||
{
|
||||
int moreToTick = 0;
|
||||
|
||||
{ RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
|
||||
{
|
||||
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
|
||||
|
||||
// tick all interfaces...
|
||||
std::map<std::string, SearchModule *>::iterator it;
|
||||
for(it = mods.begin(); it != mods.end(); it++)
|
||||
{
|
||||
if (0 < ((it -> second) -> pqi) -> tick())
|
||||
// tick all interfaces...
|
||||
std::map<std::string, SearchModule *>::iterator it;
|
||||
for(it = mods.begin(); it != mods.end(); it++)
|
||||
{
|
||||
if (0 < ((it -> second) -> pqi) -> tick())
|
||||
{
|
||||
#ifdef DEBUG_TICK
|
||||
std::cerr << "pqihandler::tick() moreToTick from mod()" << std::endl;
|
||||
#endif
|
||||
moreToTick = 1;
|
||||
}
|
||||
}
|
||||
// get the items, and queue them correctly
|
||||
if (0 < locked_GetItems())
|
||||
{
|
||||
#ifdef DEBUG_TICK
|
||||
std::cerr << "pqihandler::tick() moreToTick from mod()" << std::endl;
|
||||
std::cerr << "pqihandler::tick() moreToTick from GetItems()" << std::endl;
|
||||
#endif
|
||||
moreToTick = 1;
|
||||
}
|
||||
|
||||
}
|
||||
// get the items, and queue them correctly
|
||||
if (0 < locked_GetItems())
|
||||
{
|
||||
#ifdef DEBUG_TICK
|
||||
std::cerr << "pqihandler::tick() moreToTick from GetItems()" << std::endl;
|
||||
#endif
|
||||
moreToTick = 1;
|
||||
}
|
||||
} /****** UNLOCK ******/
|
||||
|
||||
// send items from QoS queue
|
||||
|
||||
moreToTick |= drawFromQoS_queue() ;
|
||||
|
||||
UpdateRates();
|
||||
return moreToTick;
|
||||
}
|
||||
|
||||
bool pqihandler::drawFromQoS_queue()
|
||||
{
|
||||
//std::cerr<< "pqihandler: Queue has " << qos_queue_size() << " elements. Treating " << 1+qos_queue_size()/10 << " of them." << std::endl;
|
||||
|
||||
++nb_ticks ;
|
||||
time_t now = time(NULL) ;
|
||||
if(last_m + 3 < now)
|
||||
{
|
||||
ticks_per_sec = nb_ticks / (float)(now - last_m) ;
|
||||
nb_ticks = 0 ;
|
||||
last_m = now ;
|
||||
}
|
||||
float avail_out = getMaxRate(false) * 1024 / ticks_per_sec ;
|
||||
|
||||
std::cerr << "ticks per sec: " << ticks_per_sec << ", max rate in bytes/s = " << getMaxRate(false)*1024 << ", avail out per tick= " << avail_out << std::endl;
|
||||
|
||||
uint64_t total_bytes_sent = 0 ;
|
||||
for(uint32_t i=0;i<qos_queue_size() && total_bytes_sent < avail_out;++i)
|
||||
{
|
||||
RsItem *item = out_rsItem() ;
|
||||
|
||||
if(item != NULL)
|
||||
{
|
||||
//
|
||||
uint32_t size ;
|
||||
HandleRsItem(item, 0, size);
|
||||
total_bytes_sent += size ;
|
||||
std::cerr << "treating item " << (void*)item << ", priority " << (int)item->priority_level() << ", size=" << size << std::endl;
|
||||
}
|
||||
}
|
||||
std::cerr << "total bytes sent = " << total_bytes_sent << std::endl;
|
||||
|
||||
return (qos_queue_size() > 0) ;
|
||||
}
|
||||
|
||||
|
||||
bool pqihandler::queueOutRsItem(RsItem *item)
|
||||
{
|
||||
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
|
||||
in_rsItem(item) ;
|
||||
#ifdef DEBUG_QOS
|
||||
print() ;
|
||||
#endif
|
||||
return true ;
|
||||
}
|
||||
|
||||
int pqihandler::status()
|
||||
{
|
||||
@ -200,10 +258,11 @@ int pqihandler::locked_checkOutgoingRsItem(RsItem *item, int global)
|
||||
|
||||
|
||||
// generalised output
|
||||
int pqihandler::HandleRsItem(RsItem *item, int allowglobal)
|
||||
int pqihandler::HandleRsItem(RsItem *item, int allowglobal,uint32_t& computed_size)
|
||||
{
|
||||
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
|
||||
|
||||
computed_size = 0 ;
|
||||
std::map<std::string, SearchModule *>::iterator it;
|
||||
pqioutput(PQL_DEBUG_BASIC, pqihandlerzone,
|
||||
"pqihandler::HandleRsItem()");
|
||||
@ -217,7 +276,7 @@ int pqihandler::HandleRsItem(RsItem *item, int allowglobal)
|
||||
out << " Cannot send out Global RsItem";
|
||||
pqioutput(PQL_ALERT, pqihandlerzone, out.str());
|
||||
#ifdef DEBUG_TICK
|
||||
std::cerr << out.str();
|
||||
std::cerr << out.str();
|
||||
#endif
|
||||
delete item;
|
||||
return -1;
|
||||
@ -271,7 +330,7 @@ int pqihandler::HandleRsItem(RsItem *item, int allowglobal)
|
||||
#endif
|
||||
|
||||
// if yes send on item.
|
||||
((it -> second) -> pqi) -> SendItem(item);
|
||||
((it -> second) -> pqi) -> SendItem(item,computed_size);
|
||||
return 1;
|
||||
}
|
||||
else
|
||||
@ -294,50 +353,51 @@ int pqihandler::HandleRsItem(RsItem *item, int allowglobal)
|
||||
|
||||
int pqihandler::SearchSpecific(RsCacheRequest *ns)
|
||||
{
|
||||
return HandleRsItem(ns, 0);
|
||||
return queueOutRsItem(ns) ;
|
||||
}
|
||||
|
||||
int pqihandler::SendSearchResult(RsCacheItem *ns)
|
||||
{
|
||||
return HandleRsItem(ns, 0);
|
||||
return queueOutRsItem(ns) ;
|
||||
}
|
||||
|
||||
int pqihandler::SendFileRequest(RsFileRequest *ns)
|
||||
{
|
||||
return HandleRsItem(ns, 0);
|
||||
return queueOutRsItem(ns) ;
|
||||
}
|
||||
|
||||
int pqihandler::SendFileData(RsFileData *ns)
|
||||
{
|
||||
return HandleRsItem(ns, 0);
|
||||
return queueOutRsItem(ns) ;
|
||||
}
|
||||
int pqihandler::SendFileChunkMapRequest(RsFileChunkMapRequest *ns)
|
||||
{
|
||||
return HandleRsItem(ns, 0);
|
||||
return queueOutRsItem(ns) ;
|
||||
}
|
||||
int pqihandler::SendFileChunkMap(RsFileChunkMap *ns)
|
||||
{
|
||||
return HandleRsItem(ns, 0);
|
||||
return queueOutRsItem(ns) ;
|
||||
}
|
||||
int pqihandler::SendFileCRC32MapRequest(RsFileCRC32MapRequest *ns)
|
||||
{
|
||||
return HandleRsItem(ns, 0);
|
||||
return queueOutRsItem(ns) ;
|
||||
}
|
||||
int pqihandler::SendFileCRC32Map(RsFileCRC32Map *ns)
|
||||
{
|
||||
return HandleRsItem(ns, 0);
|
||||
return queueOutRsItem(ns) ;
|
||||
}
|
||||
|
||||
int pqihandler::SendRsRawItem(RsRawItem *ns)
|
||||
{
|
||||
pqioutput(PQL_DEBUG_BASIC, pqihandlerzone,
|
||||
"pqihandler::SendRsRawItem()");
|
||||
#ifdef DEBUG_TICK
|
||||
std::cerr << "pqihandler::SendRsRawItem()" << std ::endl;
|
||||
#endif
|
||||
return HandleRsItem(ns, 0);
|
||||
pqioutput(PQL_DEBUG_BASIC, pqihandlerzone, "pqihandler::SendRsRawItem()");
|
||||
|
||||
// queue the item into the QoS
|
||||
|
||||
return queueOutRsItem(ns) ;
|
||||
}
|
||||
|
||||
|
||||
|
||||
// inputs. This is a very basic
|
||||
// system that is completely biased and slow...
|
||||
// someone please fix.
|
||||
@ -695,12 +755,14 @@ int pqihandler::UpdateRates()
|
||||
used_bw_in += crate_in;
|
||||
used_bw_out += crate_out;
|
||||
}
|
||||
#ifdef DEBUG_QOS
|
||||
// std::cerr << "Totals (In) Used B/W " << used_bw_in;
|
||||
// std::cerr << " Available B/W " << avail_in;
|
||||
// std::cerr << " Effective transfers " << effectiveDownloadsSm << std::endl;
|
||||
// std::cerr << "Totals (Out) Used B/W " << used_bw_out;
|
||||
// std::cerr << " Available B/W " << avail_out;
|
||||
// std::cerr << " Effective transfers " << effectiveUploadsSm << std::endl;
|
||||
#endif
|
||||
|
||||
locked_StoreCurrentRates(used_bw_in, used_bw_out);
|
||||
|
||||
|
@ -28,6 +28,7 @@
|
||||
|
||||
#include "pqi/pqi.h"
|
||||
#include "pqi/pqisecurity.h"
|
||||
#include "pqi/pqiqos.h"
|
||||
|
||||
#include "util/rsthreads.h"
|
||||
|
||||
@ -45,85 +46,91 @@ class SearchModule
|
||||
// Presents a P3 Face to the world!
|
||||
// and funnels data through to a PQInterface.
|
||||
//
|
||||
class pqihandler: public P3Interface
|
||||
class pqihandler: public P3Interface, public pqiQoS
|
||||
{
|
||||
public:
|
||||
pqihandler(SecurityPolicy *Global);
|
||||
bool AddSearchModule(SearchModule *mod);
|
||||
bool RemoveSearchModule(SearchModule *mod);
|
||||
pqihandler(SecurityPolicy *Global);
|
||||
bool AddSearchModule(SearchModule *mod);
|
||||
bool RemoveSearchModule(SearchModule *mod);
|
||||
|
||||
// P3Interface.
|
||||
virtual int SearchSpecific(RsCacheRequest *ns);
|
||||
virtual int SendSearchResult(RsCacheItem *);
|
||||
// P3Interface.
|
||||
virtual int SearchSpecific(RsCacheRequest *ns);
|
||||
virtual int SendSearchResult(RsCacheItem *);
|
||||
|
||||
// inputs.
|
||||
virtual RsCacheRequest * RequestedSearch();
|
||||
virtual RsCacheItem * GetSearchResult();
|
||||
// inputs.
|
||||
virtual RsCacheRequest * RequestedSearch();
|
||||
virtual RsCacheItem * GetSearchResult();
|
||||
|
||||
// file i/o
|
||||
virtual int SendFileRequest(RsFileRequest *ns);
|
||||
virtual int SendFileData(RsFileData *ns);
|
||||
virtual int SendFileChunkMapRequest(RsFileChunkMapRequest *ns);
|
||||
virtual int SendFileChunkMap(RsFileChunkMap *ns);
|
||||
virtual int SendFileCRC32MapRequest(RsFileCRC32MapRequest *ns);
|
||||
virtual int SendFileCRC32Map(RsFileCRC32Map *ns);
|
||||
virtual RsFileRequest *GetFileRequest();
|
||||
virtual RsFileData *GetFileData();
|
||||
virtual RsFileChunkMapRequest *GetFileChunkMapRequest();
|
||||
virtual RsFileChunkMap *GetFileChunkMap();
|
||||
virtual RsFileCRC32MapRequest *GetFileCRC32MapRequest();
|
||||
virtual RsFileCRC32Map *GetFileCRC32Map();
|
||||
// file i/o
|
||||
virtual int SendFileRequest(RsFileRequest *ns);
|
||||
virtual int SendFileData(RsFileData *ns);
|
||||
virtual int SendFileChunkMapRequest(RsFileChunkMapRequest *ns);
|
||||
virtual int SendFileChunkMap(RsFileChunkMap *ns);
|
||||
virtual int SendFileCRC32MapRequest(RsFileCRC32MapRequest *ns);
|
||||
virtual int SendFileCRC32Map(RsFileCRC32Map *ns);
|
||||
virtual RsFileRequest *GetFileRequest();
|
||||
virtual RsFileData *GetFileData();
|
||||
virtual RsFileChunkMapRequest *GetFileChunkMapRequest();
|
||||
virtual RsFileChunkMap *GetFileChunkMap();
|
||||
virtual RsFileCRC32MapRequest *GetFileCRC32MapRequest();
|
||||
virtual RsFileCRC32Map *GetFileCRC32Map();
|
||||
|
||||
// Rest of P3Interface
|
||||
virtual int tick();
|
||||
virtual int status();
|
||||
// Rest of P3Interface
|
||||
virtual int tick();
|
||||
virtual int status();
|
||||
|
||||
// Service Data Interface
|
||||
virtual int SendRsRawItem(RsRawItem *);
|
||||
virtual RsRawItem *GetRsRawItem();
|
||||
// Service Data Interface
|
||||
virtual int SendRsRawItem(RsRawItem *);
|
||||
virtual RsRawItem *GetRsRawItem();
|
||||
|
||||
// rate control.
|
||||
//indiv rate is deprecated
|
||||
//void setMaxIndivRate(bool in, float val);
|
||||
//float getMaxIndivRate(bool in);
|
||||
void setMaxRate(bool in, float val);
|
||||
float getMaxRate(bool in);
|
||||
// rate control.
|
||||
//indiv rate is deprecated
|
||||
//void setMaxIndivRate(bool in, float val);
|
||||
//float getMaxIndivRate(bool in);
|
||||
void setMaxRate(bool in, float val);
|
||||
float getMaxRate(bool in);
|
||||
|
||||
void getCurrentRates(float &in, float &out);
|
||||
void getCurrentRates(float &in, float &out);
|
||||
|
||||
bool drawFromQoS_queue() ;
|
||||
|
||||
protected:
|
||||
/* check to be overloaded by those that can
|
||||
* generates warnings otherwise
|
||||
*/
|
||||
/* check to be overloaded by those that can
|
||||
* generates warnings otherwise
|
||||
*/
|
||||
|
||||
int HandleRsItem(RsItem *ns, int allowglobal);
|
||||
int HandleRsItem(RsItem *ns, int allowglobal,uint32_t& size);
|
||||
bool queueOutRsItem(RsItem *) ;
|
||||
|
||||
virtual int locked_checkOutgoingRsItem(RsItem *item, int global);
|
||||
int locked_GetItems();
|
||||
void locked_SortnStoreItem(RsItem *item);
|
||||
virtual int locked_checkOutgoingRsItem(RsItem *item, int global);
|
||||
int locked_GetItems();
|
||||
void locked_SortnStoreItem(RsItem *item);
|
||||
|
||||
RsMutex coreMtx; /* MUTEX */
|
||||
RsMutex coreMtx; /* MUTEX */
|
||||
|
||||
std::map<std::string, SearchModule *> mods;
|
||||
SecurityPolicy *globsec;
|
||||
std::map<std::string, SearchModule *> mods;
|
||||
SecurityPolicy *globsec;
|
||||
|
||||
// Temporary storage...
|
||||
std::list<RsItem *> in_result, in_search, in_request, in_data, in_service,in_chunkmap,in_chunkmap_request,in_crc32map_request,in_crc32map;
|
||||
// Temporary storage...
|
||||
std::list<RsItem *> in_result, in_search, in_request, in_data, in_service,in_chunkmap,in_chunkmap_request,in_crc32map_request,in_crc32map;
|
||||
|
||||
private:
|
||||
|
||||
// rate control.
|
||||
int UpdateRates();
|
||||
void locked_StoreCurrentRates(float in, float out);
|
||||
// rate control.
|
||||
int UpdateRates();
|
||||
void locked_StoreCurrentRates(float in, float out);
|
||||
|
||||
float rateIndiv_in;
|
||||
float rateIndiv_out;
|
||||
float rateMax_in;
|
||||
float rateMax_out;
|
||||
float rateIndiv_in;
|
||||
float rateIndiv_out;
|
||||
float rateMax_in;
|
||||
float rateMax_out;
|
||||
|
||||
float rateTotal_in;
|
||||
float rateTotal_out;
|
||||
float rateTotal_in;
|
||||
float rateTotal_out;
|
||||
|
||||
uint32_t nb_ticks ;
|
||||
time_t last_m ;
|
||||
float ticks_per_sec ;
|
||||
};
|
||||
|
||||
//inline void pqihandler::setMaxIndivRate(bool in, float val)
|
||||
|
79
libretroshare/src/pqi/pqiqos.cc
Normal file
79
libretroshare/src/pqi/pqiqos.cc
Normal file
@ -0,0 +1,79 @@
|
||||
#include <iostream>
|
||||
#include <list>
|
||||
#include <math.h>
|
||||
#include <serialiser/rsserial.h>
|
||||
|
||||
#include "pqiqos.h"
|
||||
|
||||
pqiQoS::pqiQoS(uint32_t nb_levels,float alpha)
|
||||
: _item_queues(nb_levels),_alpha(alpha)
|
||||
{
|
||||
assert(pow(alpha,nb_levels) < 1e+20) ;
|
||||
|
||||
float c = 1.0f ;
|
||||
float inc = alpha ;
|
||||
_nb_items = 0 ;
|
||||
|
||||
for(int i=((int)nb_levels)-1;i>=0;--i,c *= alpha)
|
||||
{
|
||||
_item_queues[i]._threshold = c ;
|
||||
_item_queues[i]._counter = 0 ;
|
||||
_item_queues[i]._inc = inc ;
|
||||
}
|
||||
}
|
||||
|
||||
void pqiQoS::print() const
|
||||
{
|
||||
std::cerr << "pqiQoS: " << _item_queues.size() << " levels, alpha=" << _alpha ;
|
||||
std::cerr << " Size = " << _nb_items ;
|
||||
std::cerr << " Queues: " ;
|
||||
for(uint32_t i=0;i<_item_queues.size();++i)
|
||||
std::cerr << _item_queues[i]._items.size() << " " ;
|
||||
std::cerr << std::endl;
|
||||
}
|
||||
|
||||
void pqiQoS::in_rsItem(RsItem *item)
|
||||
{
|
||||
if(item->priority_level() >= _item_queues.size())
|
||||
{
|
||||
std::cerr << "pqiQoS::in_rsRawItem() ****Warning****: priority " << item->priority_level() << " out of scope [0," << _item_queues.size()-1 << "]. Priority will be clamped to maximum value." << std::endl;
|
||||
item->setPriorityLevel(_item_queues.size()-1) ;
|
||||
}
|
||||
|
||||
_item_queues[item->priority_level()].push(item) ;
|
||||
++_nb_items ;
|
||||
}
|
||||
|
||||
RsItem *pqiQoS::out_rsItem()
|
||||
{
|
||||
// Go through the queues. Increment counters.
|
||||
|
||||
if(_nb_items == 0)
|
||||
return NULL ;
|
||||
|
||||
float inc = 1.0f ;
|
||||
int i = _item_queues.size()-1 ;
|
||||
|
||||
while(i > 0 && _item_queues[i]._items.empty())
|
||||
--i, inc = _item_queues[i]._inc ;
|
||||
|
||||
int last = i ;
|
||||
|
||||
for(int j=i;j>=0;--j)
|
||||
if( (!_item_queues[j]._items.empty()) && ((_item_queues[j]._counter += inc) >= _item_queues[j]._threshold ))
|
||||
{
|
||||
last = j ;
|
||||
_item_queues[j]._counter -= _item_queues[j]._threshold ;
|
||||
}
|
||||
|
||||
if(last >= 0)
|
||||
{
|
||||
assert(_nb_items > 0) ;
|
||||
--_nb_items ;
|
||||
return _item_queues[last].pop();
|
||||
}
|
||||
else
|
||||
return NULL ;
|
||||
}
|
||||
|
||||
|
89
libretroshare/src/pqi/pqiqos.h
Normal file
89
libretroshare/src/pqi/pqiqos.h
Normal file
@ -0,0 +1,89 @@
|
||||
/*
|
||||
* libretroshare/src/pqi: pqiqos.h
|
||||
*
|
||||
* 3P/PQI network interface for RetroShare.
|
||||
*
|
||||
* Copyright 2004-2008 by Cyril Soler
|
||||
*
|
||||
* This library is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Library General Public
|
||||
* License Version 2 as published by the Free Software Foundation.
|
||||
*
|
||||
* This library is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
* Library General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Library General Public
|
||||
* License along with this library; if not, write to the Free Software
|
||||
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
|
||||
* USA.
|
||||
*
|
||||
* Please report all bugs and problems to "csoler@users.sourceforge.net"
|
||||
*
|
||||
*/
|
||||
|
||||
// This class handles the prioritisation of RsItem, based on the
|
||||
// priority level. The QoS algorithm must ensure that:
|
||||
//
|
||||
// - lower priority items get out with lower rate than high priority items
|
||||
// - items of equal priority get out of the queue in the same order than they got in
|
||||
// - items of level n+1 are output \alpha times more often than items of level n.
|
||||
// \alpha is a constant that is not necessarily an integer, but strictly > 1.
|
||||
// - the set of possible priority levels is finite, and pre-determined.
|
||||
//
|
||||
#include <vector>
|
||||
#include <list>
|
||||
|
||||
class RsItem ;
|
||||
|
||||
class pqiQoS
|
||||
{
|
||||
public:
|
||||
pqiQoS(uint32_t max_levels,float alpha) ;
|
||||
|
||||
class ItemQueue
|
||||
{
|
||||
public:
|
||||
RsItem *pop()
|
||||
{
|
||||
if(_items.empty())
|
||||
return NULL ;
|
||||
|
||||
RsItem *item = _items.front() ;
|
||||
_items.pop_front() ;
|
||||
|
||||
return item ;
|
||||
}
|
||||
|
||||
void push(RsItem *item)
|
||||
{
|
||||
_items.push_back(item) ;
|
||||
}
|
||||
|
||||
float _threshold ;
|
||||
float _counter ;
|
||||
float _inc ;
|
||||
std::list<RsItem*> _items ;
|
||||
};
|
||||
|
||||
// This function pops items from the queue, y order of priority
|
||||
//
|
||||
RsItem *out_rsItem() ;
|
||||
|
||||
// This function is used to queue items.
|
||||
//
|
||||
void in_rsItem(RsItem *item) ;
|
||||
|
||||
void print() const ;
|
||||
uint64_t qos_queue_size() const { return _nb_items ; }
|
||||
|
||||
private:
|
||||
// This vector stores the lists of items with equal priorities.
|
||||
//
|
||||
std::vector<ItemQueue> _item_queues ;
|
||||
float _alpha ;
|
||||
uint64_t _nb_items ;
|
||||
};
|
||||
|
||||
|
@ -166,7 +166,7 @@ pqistreamer::~pqistreamer()
|
||||
|
||||
|
||||
// Get/Send Items.
|
||||
int pqistreamer::SendItem(RsItem *si)
|
||||
int pqistreamer::SendItem(RsItem *si,uint32_t& out_size)
|
||||
{
|
||||
#ifdef RSITEM_DEBUG
|
||||
{
|
||||
@ -178,7 +178,7 @@ int pqistreamer::SendItem(RsItem *si)
|
||||
}
|
||||
#endif
|
||||
|
||||
return queue_outpqi(si);
|
||||
return queue_outpqi(si,out_size);
|
||||
}
|
||||
|
||||
RsItem *pqistreamer::GetItem()
|
||||
@ -302,8 +302,9 @@ int pqistreamer::status()
|
||||
//
|
||||
/**************** HANDLE OUTGOING TRANSLATION + TRANSMISSION ******/
|
||||
|
||||
int pqistreamer::queue_outpqi(RsItem *pqi)
|
||||
int pqistreamer::queue_outpqi(RsItem *pqi,uint32_t& pktsize)
|
||||
{
|
||||
pktsize = 0 ;
|
||||
#ifdef DEBUG_PQISTREAMER
|
||||
std::cerr << "pqistreamer::queue_outpqi() called." << std::endl;
|
||||
#endif
|
||||
@ -338,7 +339,7 @@ int pqistreamer::queue_outpqi(RsItem *pqi)
|
||||
isCntrl = false ;
|
||||
}
|
||||
|
||||
uint32_t pktsize = rsSerialiser->size(pqi);
|
||||
pktsize = rsSerialiser->size(pqi);
|
||||
void *ptr = malloc(pktsize);
|
||||
|
||||
#ifdef DEBUG_PQISTREAMER
|
||||
|
@ -41,75 +41,81 @@
|
||||
|
||||
class pqistreamer: public PQInterface
|
||||
{
|
||||
public:
|
||||
pqistreamer(RsSerialiser *rss, std::string peerid, BinInterface *bio_in, int bio_flagsin);
|
||||
virtual ~pqistreamer();
|
||||
public:
|
||||
pqistreamer(RsSerialiser *rss, std::string peerid, BinInterface *bio_in, int bio_flagsin);
|
||||
virtual ~pqistreamer();
|
||||
|
||||
// PQInterface
|
||||
virtual int SendItem(RsItem *);
|
||||
virtual RsItem *GetItem();
|
||||
// PQInterface
|
||||
virtual int SendItem(RsItem *item)
|
||||
{
|
||||
std::cerr << "Warning pqistreamer::sendItem(RsItem*) should not be called. Plz call SendItem(RsItem *,uint32_t& serialized_size) instead." << std::endl;
|
||||
uint32_t serialized_size ;
|
||||
return SendItem(item,serialized_size) ;
|
||||
}
|
||||
virtual int SendItem(RsItem *,uint32_t& serialized_size);
|
||||
virtual RsItem *GetItem();
|
||||
|
||||
virtual int tick();
|
||||
virtual int status();
|
||||
virtual int tick();
|
||||
virtual int status();
|
||||
|
||||
private:
|
||||
/* Implementation */
|
||||
/* Implementation */
|
||||
|
||||
// to filter functions - detect filecancel/data and act!
|
||||
int queue_outpqi( RsItem *i);
|
||||
int handleincomingitem(RsItem *i);
|
||||
// to filter functions - detect filecancel/data and act!
|
||||
int queue_outpqi( RsItem *i,uint32_t& serialized_size);
|
||||
int handleincomingitem(RsItem *i);
|
||||
|
||||
// ticked regularly (manages out queues and sending
|
||||
// via above interfaces.
|
||||
int handleoutgoing();
|
||||
int handleincoming();
|
||||
// ticked regularly (manages out queues and sending
|
||||
// via above interfaces.
|
||||
int handleoutgoing();
|
||||
int handleincoming();
|
||||
|
||||
// Bandwidth/Streaming Management.
|
||||
float outTimeSlice();
|
||||
// Bandwidth/Streaming Management.
|
||||
float outTimeSlice();
|
||||
|
||||
int outAllowedBytes();
|
||||
void outSentBytes(int );
|
||||
int outAllowedBytes();
|
||||
void outSentBytes(int );
|
||||
|
||||
int inAllowedBytes();
|
||||
void inReadBytes(int );
|
||||
int inAllowedBytes();
|
||||
void inReadBytes(int );
|
||||
|
||||
// RsSerialiser - determines which packets can be serialised.
|
||||
RsSerialiser *rsSerialiser;
|
||||
// Binary Interface for IO, initialisated at startup.
|
||||
BinInterface *bio;
|
||||
unsigned int bio_flags; // BIN_FLAGS_NO_CLOSE | BIN_FLAGS_NO_DELETE
|
||||
// RsSerialiser - determines which packets can be serialised.
|
||||
RsSerialiser *rsSerialiser;
|
||||
// Binary Interface for IO, initialisated at startup.
|
||||
BinInterface *bio;
|
||||
unsigned int bio_flags; // BIN_FLAGS_NO_CLOSE | BIN_FLAGS_NO_DELETE
|
||||
|
||||
void *pkt_wpending; // storage for pending packet to write.
|
||||
int pkt_rpend_size; // size of pkt_rpending.
|
||||
void *pkt_rpending; // storage for read in pending packets.
|
||||
void *pkt_wpending; // storage for pending packet to write.
|
||||
int pkt_rpend_size; // size of pkt_rpending.
|
||||
void *pkt_rpending; // storage for read in pending packets.
|
||||
|
||||
enum {reading_state_packet_started=1,
|
||||
enum {reading_state_packet_started=1,
|
||||
reading_state_initial=0 } ;
|
||||
|
||||
int reading_state ;
|
||||
int failed_read_attempts ;
|
||||
int reading_state ;
|
||||
int failed_read_attempts ;
|
||||
|
||||
// Temp Storage for transient data.....
|
||||
std::list<void *> out_pkt; // Cntrl / Search / Results queue
|
||||
std::list<void *> out_data; // FileData - secondary queue.
|
||||
std::list<RsItem *> incoming;
|
||||
// Temp Storage for transient data.....
|
||||
std::list<void *> out_pkt; // Cntrl / Search / Results queue
|
||||
std::list<void *> out_data; // FileData - secondary queue.
|
||||
std::list<RsItem *> incoming;
|
||||
|
||||
// data for network stats.
|
||||
int totalRead;
|
||||
int totalSent;
|
||||
// data for network stats.
|
||||
int totalRead;
|
||||
int totalSent;
|
||||
|
||||
// these are representative (but not exact)
|
||||
int currRead;
|
||||
int currSent;
|
||||
int currReadTS; // TS from which these are measured.
|
||||
int currSentTS;
|
||||
// these are representative (but not exact)
|
||||
int currRead;
|
||||
int currSent;
|
||||
int currReadTS; // TS from which these are measured.
|
||||
int currSentTS;
|
||||
|
||||
int avgLastUpdate; // TS from which these are measured.
|
||||
float avgReadCount;
|
||||
float avgSentCount;
|
||||
int avgLastUpdate; // TS from which these are measured.
|
||||
float avgReadCount;
|
||||
float avgSentCount;
|
||||
|
||||
RsMutex streamerMtx ;
|
||||
// pthread_t thread_id;
|
||||
RsMutex streamerMtx ;
|
||||
// pthread_t thread_id;
|
||||
};
|
||||
|
||||
|
||||
|
@ -33,6 +33,7 @@
|
||||
#define RSSERIAL_DEBUG 1
|
||||
#define CHAT_DEBUG 1
|
||||
***/
|
||||
#define CHAT_DEBUG 1
|
||||
|
||||
#include <iostream>
|
||||
|
||||
@ -244,6 +245,10 @@ bool RsChatMsgItem::serialise(void *data, uint32_t& pktsize)
|
||||
std::cerr << "computed size: " << 256*((unsigned char*)data)[6]+((unsigned char*)data)[7] << std::endl ;
|
||||
#endif
|
||||
|
||||
std::cerr << "Serialization result: " ;
|
||||
for(int i=0;i<20;++i)
|
||||
std::cerr << (int)((uint8_t*)data)[i] << " " ;
|
||||
std::cerr << std::endl ;
|
||||
return ok;
|
||||
}
|
||||
|
||||
@ -379,6 +384,11 @@ RsChatMsgItem::RsChatMsgItem(void *data,uint32_t size)
|
||||
uint32_t rssize = getRsItemSize(data);
|
||||
bool ok = true ;
|
||||
|
||||
std::cerr << "Received packet result: " ;
|
||||
for(int i=0;i<20;++i)
|
||||
std::cerr << (int)((uint8_t*)data)[i] << " " ;
|
||||
std::cerr << std::endl ;
|
||||
|
||||
/* get mandatory parts first */
|
||||
ok &= getRawUInt32(data, rssize, &offset, &chatFlags);
|
||||
ok &= getRawUInt32(data, rssize, &offset, &sendTime);
|
||||
|
@ -61,7 +61,10 @@ const uint8_t RS_PKT_SUBTYPE_MSG_PARENT_TAG = 0x06;
|
||||
class RsChatItem: public RsItem
|
||||
{
|
||||
public:
|
||||
RsChatItem(uint8_t chat_subtype) : RsItem(RS_PKT_VERSION_SERVICE,RS_SERVICE_TYPE_CHAT,chat_subtype) {}
|
||||
RsChatItem(uint8_t chat_subtype) : RsItem(RS_PKT_VERSION_SERVICE,RS_SERVICE_TYPE_CHAT,chat_subtype)
|
||||
{
|
||||
setPriorityLevel(7) ;
|
||||
}
|
||||
|
||||
virtual ~RsChatItem() {}
|
||||
virtual void clear() {}
|
||||
@ -78,7 +81,9 @@ class RsChatItem: public RsItem
|
||||
class RsChatMsgItem: public RsChatItem
|
||||
{
|
||||
public:
|
||||
RsChatMsgItem() :RsChatItem(RS_PKT_SUBTYPE_DEFAULT) {}
|
||||
RsChatMsgItem() :RsChatItem(RS_PKT_SUBTYPE_DEFAULT)
|
||||
{
|
||||
}
|
||||
RsChatMsgItem(void *data,uint32_t size) ; // deserialization
|
||||
|
||||
virtual ~RsChatMsgItem() {}
|
||||
|
@ -49,7 +49,8 @@
|
||||
RsItem::RsItem(uint32_t t)
|
||||
:type(t)
|
||||
{
|
||||
return;
|
||||
_queue_type = CONTROL_QUEUE ;
|
||||
_priority_level = 5 ;
|
||||
}
|
||||
|
||||
#ifdef DO_STATISTICS
|
||||
@ -116,13 +117,14 @@ void RsItem::operator delete(void *p,size_t s)
|
||||
|
||||
RsItem::RsItem(uint8_t ver, uint8_t cls, uint8_t t, uint8_t subtype)
|
||||
{
|
||||
_priority_level = 5 ;
|
||||
_queue_type = CONTROL_QUEUE ;
|
||||
|
||||
type = (ver << 24) + (cls << 16) + (t << 8) + subtype;
|
||||
return;
|
||||
}
|
||||
|
||||
RsItem::~RsItem()
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
@ -158,6 +160,8 @@ uint8_t RsItem::PacketSubType()
|
||||
/* For Service Packets */
|
||||
RsItem::RsItem(uint8_t ver, uint16_t service, uint8_t subtype)
|
||||
{
|
||||
_queue_type = CONTROL_QUEUE ;
|
||||
_priority_level = 5 ;
|
||||
type = (ver << 24) + (service << 8) + subtype;
|
||||
return;
|
||||
}
|
||||
|
@ -70,40 +70,46 @@ const uint8_t RS_PKT_SUBTYPE_DEFAULT = 0x01; /* if only one subtype */
|
||||
class RsItem: public RsMemoryManagement::SmallObject
|
||||
{
|
||||
public:
|
||||
RsItem(uint32_t t);
|
||||
RsItem(uint8_t ver, uint8_t cls, uint8_t t, uint8_t subtype);
|
||||
RsItem(uint32_t t);
|
||||
RsItem(uint8_t ver, uint8_t cls, uint8_t t, uint8_t subtype);
|
||||
#ifdef DO_STATISTICS
|
||||
void *operator new(size_t s) ;
|
||||
void operator delete(void *,size_t s) ;
|
||||
void *operator new(size_t s) ;
|
||||
void operator delete(void *,size_t s) ;
|
||||
#endif
|
||||
|
||||
virtual ~RsItem();
|
||||
virtual void clear() = 0;
|
||||
virtual std::ostream &print(std::ostream &out, uint16_t indent = 0) = 0;
|
||||
virtual ~RsItem();
|
||||
virtual void clear() = 0;
|
||||
virtual std::ostream &print(std::ostream &out, uint16_t indent = 0) = 0;
|
||||
|
||||
/* source / destination id */
|
||||
const std::string& PeerId() const { return peerId; }
|
||||
void PeerId(const std::string& id) { peerId = id; }
|
||||
/* source / destination id */
|
||||
const std::string& PeerId() const { return peerId; }
|
||||
void PeerId(const std::string& id) { peerId = id; }
|
||||
|
||||
/* complete id */
|
||||
uint32_t PacketId();
|
||||
/* complete id */
|
||||
uint32_t PacketId();
|
||||
|
||||
/* id parts */
|
||||
uint8_t PacketVersion();
|
||||
uint8_t PacketClass();
|
||||
uint8_t PacketType();
|
||||
uint8_t PacketSubType();
|
||||
/* id parts */
|
||||
uint8_t PacketVersion();
|
||||
uint8_t PacketClass();
|
||||
uint8_t PacketType();
|
||||
uint8_t PacketSubType();
|
||||
|
||||
/* For Service Packets */
|
||||
RsItem(uint8_t ver, uint16_t service, uint8_t subtype);
|
||||
uint16_t PacketService(); /* combined Packet class/type (mid 16bits) */
|
||||
/* For Service Packets */
|
||||
RsItem(uint8_t ver, uint16_t service, uint8_t subtype);
|
||||
uint16_t PacketService(); /* combined Packet class/type (mid 16bits) */
|
||||
|
||||
typedef enum { CONTROL_QUEUE, DATA_QUEUE } QueueType ;
|
||||
virtual QueueType queueType() const { return CONTROL_QUEUE ; }
|
||||
typedef enum { CONTROL_QUEUE, DATA_QUEUE } QueueType ;
|
||||
|
||||
inline RsItem::QueueType queueType() const { return _queue_type ;}
|
||||
inline void setQueueType(const RsItem::QueueType& t) { _queue_type = t ;}
|
||||
|
||||
inline uint8_t priority_level() const { return _priority_level ;}
|
||||
inline void setPriorityLevel(uint8_t l) { _priority_level = l ;}
|
||||
private:
|
||||
uint32_t type;
|
||||
std::string peerId;
|
||||
uint32_t type;
|
||||
std::string peerId;
|
||||
RsItem::QueueType _queue_type ;
|
||||
uint8_t _priority_level ;
|
||||
};
|
||||
|
||||
|
||||
@ -172,31 +178,29 @@ std::ostream &printIndent(std::ostream &out, uint16_t indent);
|
||||
class RsRawItem: public RsItem
|
||||
{
|
||||
public:
|
||||
RsRawItem(uint32_t t, uint32_t size)
|
||||
:RsItem(t), len(size), _queue_type(RsItem::CONTROL_QUEUE)
|
||||
{ data = malloc(len);}
|
||||
RsRawItem(uint32_t t, uint32_t size)
|
||||
:RsItem(t), len(size)
|
||||
{
|
||||
data = malloc(len);
|
||||
}
|
||||
|
||||
virtual ~RsRawItem()
|
||||
{
|
||||
if (data)
|
||||
free(data);
|
||||
data = NULL;
|
||||
len = 0;
|
||||
}
|
||||
virtual ~RsRawItem()
|
||||
{
|
||||
if (data)
|
||||
free(data);
|
||||
data = NULL;
|
||||
len = 0;
|
||||
}
|
||||
|
||||
uint32_t getRawLength() { return len; }
|
||||
void * getRawData() { return data; }
|
||||
uint32_t getRawLength() { return len; }
|
||||
void *getRawData() { return data; }
|
||||
|
||||
virtual void clear() { return; } /* what can it do? */
|
||||
virtual std::ostream &print(std::ostream &out, uint16_t indent = 0);
|
||||
|
||||
virtual RsItem::QueueType queueType() const { return _queue_type ;}
|
||||
void setQueueType(const RsItem::QueueType& t) { _queue_type = t ;}
|
||||
virtual void clear() { return; } /* what can it do? */
|
||||
virtual std::ostream &print(std::ostream &out, uint16_t indent = 0);
|
||||
|
||||
private:
|
||||
void *data;
|
||||
uint32_t len;
|
||||
RsItem::QueueType _queue_type ;
|
||||
void *data;
|
||||
uint32_t len;
|
||||
};
|
||||
|
||||
|
||||
|
@ -200,6 +200,7 @@ RsRawItem *p3Service::send()
|
||||
{
|
||||
raw->PeerId(si->PeerId());
|
||||
raw->setQueueType(si->queueType()) ;
|
||||
raw->setPriorityLevel(si->priority_level()) ;
|
||||
}
|
||||
|
||||
/* cleanup */
|
||||
|
@ -1,5 +1,7 @@
|
||||
|
||||
RS_TOP_DIR = ../..
|
||||
DHT_TOP_DIR = ../../../../libbitdht/src/
|
||||
|
||||
##### Define any flags that are needed for this section #######
|
||||
###############################################################
|
||||
|
||||
@ -10,6 +12,7 @@ include $(RS_TOP_DIR)/tests/scripts/config.mk
|
||||
# Generic Test Harnesses.
|
||||
TESTOBJ = conn_harness.o ppg_harness.o
|
||||
|
||||
TESTOBJ += pqiqos_test.o
|
||||
TESTOBJ += net_test.o dht_test.o net_test1.o netiface_test.o dht_test.o
|
||||
TESTOBJ += pkt_test.o pqiarchive_test.o pqiperson_test.o
|
||||
TESTOBJ += extaddrfinder_test.o dnsresolver_test.o pqiipset_test.o
|
||||
@ -17,7 +20,8 @@ TESTOBJ += p3connmgr_reset_test.o p3connmgr_connect_test.o
|
||||
#conn_test.o
|
||||
|
||||
TESTS = net_test net_test1 netiface_test pqiarchive_test pqiperson_test dnsresolver_test extaddrfinder_test
|
||||
TESTS += pqiipset_test
|
||||
TESTS += pqiipset_test
|
||||
TESTS += pqiqos_test
|
||||
TESTS += p3connmgr_reset_test p3connmgr_connect_test
|
||||
#TESTS = p3connmgr_test1
|
||||
|
||||
@ -51,10 +55,13 @@ pqiarchive_test: pqiarchive_test.o pkt_test.o
|
||||
$(CC) $(CFLAGS) -o pqiarchive_test pkt_test.o pqiarchive_test.o $(LIBS)
|
||||
|
||||
pqiperson_test: pqiperson_test.o testconnect.o
|
||||
$(CC) $(CFLAGS) -o pqiperson_test pqiperson_test.o testconnect.o $(LIBS)
|
||||
$(CC) $(CFLAGS) -o pqiperson_test pqiperson_test.o testconnect.o $(LIBS) -lstdc++
|
||||
|
||||
extaddrfinder_test: extaddrfinder_test.o
|
||||
$(CC) $(CFLAGS) -o extaddrfinder_test extaddrfinder_test.o $(LIBS)
|
||||
$(CC) $(CFLAGS) -DEXTADDRSEARCH_DEBUG=1 -o extaddrfinder_test extaddrfinder_test.o $(LIBS)
|
||||
|
||||
pqiqos_test: pqiqos_test.o
|
||||
$(CC) $(CFLAGS) -o pqiqos_test pqiqos_test.o $(LIBS)
|
||||
|
||||
dnsresolver_test: dnsresolver_test.o
|
||||
$(CC) $(CFLAGS) -o dnsresolver_test dnsresolver_test.o $(LIBS)
|
||||
|
191
libretroshare/src/tests/pqi/pqiqos_test.cc
Normal file
191
libretroshare/src/tests/pqi/pqiqos_test.cc
Normal file
@ -0,0 +1,191 @@
|
||||
#include "util/utest.h"
|
||||
|
||||
#include <iostream>
|
||||
#include <math.h>
|
||||
#include <stdint.h>
|
||||
#include "serialiser/rsserial.h"
|
||||
#include "util/rsrandom.h"
|
||||
#include "pqi/pqiqos.h"
|
||||
|
||||
INITTEST();
|
||||
|
||||
// Class of RsItem with ids to check order
|
||||
//
|
||||
class testRawItem: public RsItem
|
||||
{
|
||||
public:
|
||||
testRawItem()
|
||||
: RsItem(0,0,0), _id(_static_id++)
|
||||
{
|
||||
}
|
||||
|
||||
virtual void clear() {}
|
||||
virtual std::ostream& print(std::ostream& o, uint16_t) { return o ; }
|
||||
|
||||
static const uint32_t rs_rawitem_size = 10 ;
|
||||
static uint64_t _static_id ;
|
||||
uint64_t _id ;
|
||||
};
|
||||
uint64_t testRawItem::_static_id = 1;
|
||||
|
||||
int main()
|
||||
{
|
||||
float alpha = 3 ;
|
||||
int nb_levels = 10 ;
|
||||
|
||||
//////////////////////////////////////////////
|
||||
// 1 - Test consistency of output and order //
|
||||
//////////////////////////////////////////////
|
||||
{
|
||||
pqiQoS qos(nb_levels,alpha) ;
|
||||
|
||||
// 0 - Fill the queue with fake RsItem.
|
||||
//
|
||||
static const uint32_t pushed_items = 10000 ;
|
||||
|
||||
for(uint32_t i=0;i<pushed_items;++i)
|
||||
{
|
||||
RsItem *item = new testRawItem ;
|
||||
item->setPriorityLevel(i % nb_levels) ;
|
||||
|
||||
qos.in_rsItem(item) ;
|
||||
}
|
||||
std::cerr << "QOS is filled with: " << std::endl;
|
||||
qos.print() ;
|
||||
|
||||
// 1 - checks that all items eventually got out in the same order for
|
||||
// items of equal priority
|
||||
//
|
||||
uint32_t poped = 0;
|
||||
std::vector<uint64_t> last_ids(nb_levels,0) ;
|
||||
|
||||
while(testRawItem *item = static_cast<testRawItem*>(qos.out_rsItem()))
|
||||
{
|
||||
CHECK(last_ids[item->priority_level()] < item->_id) ;
|
||||
|
||||
last_ids[item->priority_level()] = item->_id ;
|
||||
delete item, ++poped ;
|
||||
}
|
||||
|
||||
std::cerr << "Push " << pushed_items << " items, poped " << poped << std::endl;
|
||||
|
||||
if(pushed_items != poped)
|
||||
{
|
||||
std::cerr << "Queues are: " << std::endl;
|
||||
qos.print() ;
|
||||
}
|
||||
CHECK(pushed_items == poped) ;
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////
|
||||
// 2 - tests proportionality //
|
||||
//////////////////////////////////////////////////
|
||||
{
|
||||
// Now we feed the QoS, and check that items get out with probability proportional
|
||||
// to the priority
|
||||
//
|
||||
pqiQoS qos(nb_levels,alpha) ;
|
||||
|
||||
std::cerr << "Feeding 10^6 packets to the QoS service." << std::endl;
|
||||
for(int i=0;i<1000000;++i)
|
||||
{
|
||||
if(i%10000 == 0)
|
||||
{
|
||||
fprintf(stderr,"%1.2f %% completed.\r",i/(float)1000000*100.0f) ;
|
||||
fflush(stderr) ;
|
||||
}
|
||||
testRawItem *item = new testRawItem ;
|
||||
|
||||
switch(i%5)
|
||||
{
|
||||
case 0: item->setPriorityLevel( 1 ) ; break ;
|
||||
case 1: item->setPriorityLevel( 4 ) ; break ;
|
||||
case 2: item->setPriorityLevel( 6 ) ; break ;
|
||||
case 3: item->setPriorityLevel( 7 ) ; break ;
|
||||
case 4: item->setPriorityLevel( 8 ) ; break ;
|
||||
}
|
||||
|
||||
qos.in_rsItem(item) ;
|
||||
}
|
||||
|
||||
// Now perform stats on outputs for the 10000 first elements
|
||||
|
||||
std::vector<int> hist(nb_levels,0) ;
|
||||
|
||||
for(uint32_t i=0;i<10000;++i)
|
||||
{
|
||||
RsItem *item = qos.out_rsItem() ;
|
||||
hist[item->priority_level()]++ ;
|
||||
delete item ;
|
||||
}
|
||||
|
||||
std::cerr << "Histogram: " ;
|
||||
for(uint32_t i=0;i<hist.size();++i)
|
||||
std::cerr << hist[i] << " " ;
|
||||
std::cerr << std::endl;
|
||||
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////
|
||||
// 3 - Now do a test with a thread filling the queue and another getting from it //
|
||||
///////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
{
|
||||
pqiQoS qos(nb_levels,alpha) ;
|
||||
|
||||
static const time_t duration = 60 ;
|
||||
static const int average_packet_load = 10000 ;
|
||||
time_t start = time(NULL) ;
|
||||
time_t now ;
|
||||
|
||||
while( (now = time(NULL)) < duration+start )
|
||||
{
|
||||
float in_out_ratio = 1.0f;// - 0.3*cos( (now-start)*M_PI ) ; // out over in
|
||||
|
||||
// feed a random number of packets in
|
||||
|
||||
uint32_t in_packets = RSRandom::random_u32() % average_packet_load ;
|
||||
uint32_t out_packets = (uint32_t)((RSRandom::random_u32() % average_packet_load)*in_out_ratio) ;
|
||||
|
||||
for(uint32_t i=0;i<in_packets;++i)
|
||||
{
|
||||
testRawItem *item = new testRawItem ;
|
||||
item->setPriorityLevel(i%nb_levels) ;
|
||||
qos.in_rsItem(item) ;
|
||||
}
|
||||
|
||||
// pop a random number of packets out
|
||||
|
||||
std::vector<uint64_t> last_ids(nb_levels,0) ;
|
||||
|
||||
for(uint32_t i=0;i<out_packets;++i)
|
||||
{
|
||||
testRawItem *item = static_cast<testRawItem*>(qos.out_rsItem()) ;
|
||||
|
||||
if(item == NULL)
|
||||
{
|
||||
std::cerr << "Null output !" << std::endl;
|
||||
break ;
|
||||
}
|
||||
|
||||
CHECK(last_ids[item->priority_level()] < item->_id) ;
|
||||
|
||||
last_ids[item->priority_level()] = item->_id ;
|
||||
delete item ;
|
||||
}
|
||||
|
||||
// print some info
|
||||
static time_t last = 0 ;
|
||||
if(now > last)
|
||||
{
|
||||
qos.print() ;
|
||||
last = now ;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
FINALREPORT("pqiqos_test");
|
||||
return TESTRESULT() ;
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user