From 0ed60eaf8698a78949089c2da1a30ef2c370a71e Mon Sep 17 00:00:00 2001 From: csoler Date: Sun, 4 Sep 2011 20:01:30 +0000 Subject: [PATCH] merge of QoS branch into trunk git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@4588 b45a01b8-16f6-495d-af2f-9b41ad6348cc --- libretroshare/src/libretroshare.pro | 2 + libretroshare/src/pqi/pqi_base.h | 58 +++--- libretroshare/src/pqi/pqihandler.cc | 143 +++++++++---- libretroshare/src/pqi/pqihandler.h | 121 +++++------ libretroshare/src/pqi/pqiperson.cc | 4 +- libretroshare/src/pqi/pqiperson.h | 8 +- libretroshare/src/pqi/pqistreamer.cc | 124 ++++-------- libretroshare/src/pqi/pqistreamer.h | 105 +++++----- libretroshare/src/serialiser/itempriorities.h | 75 +++++++ libretroshare/src/serialiser/rsbaseitems.h | 32 ++- libretroshare/src/serialiser/rsdiscitems.h | 16 +- libretroshare/src/serialiser/rsmsgitems.cc | 10 + libretroshare/src/serialiser/rsmsgitems.h | 15 +- libretroshare/src/serialiser/rsserial.cc | 7 +- libretroshare/src/serialiser/rsserial.h | 89 ++++---- libretroshare/src/serialiser/rsstatusitems.h | 4 +- libretroshare/src/serialiser/rsvoipitems.h | 3 +- libretroshare/src/services/p3service.cc | 12 +- libretroshare/src/tests/pqi/Makefile | 13 +- libretroshare/src/tests/pqi/pqiqos_test.cc | 191 ++++++++++++++++++ libretroshare/src/turtle/p3turtle.cc | 5 + libretroshare/src/turtle/rsturtleitem.cc | 9 + libretroshare/src/turtle/rsturtleitem.h | 25 +-- 23 files changed, 734 insertions(+), 337 deletions(-) create mode 100644 libretroshare/src/serialiser/itempriorities.h create mode 100644 libretroshare/src/tests/pqi/pqiqos_test.cc diff --git a/libretroshare/src/libretroshare.pro b/libretroshare/src/libretroshare.pro index 06ba1127a..ed225c9ad 100644 --- a/libretroshare/src/libretroshare.pro +++ b/libretroshare/src/libretroshare.pro @@ -348,6 +348,7 @@ HEADERS += pqi/authssl.h \ pqi/p3dhtmgr.h \ pqi/p3notify.h \ pqi/p3upnpmgr.h \ + pqi/pqiqos.h \ pqi/pqi.h \ pqi/pqi_base.h \ pqi/pqiarchive.h \ @@ -471,6 +472,7 @@ SOURCES += pqi/authgpg.cc \ pqi/p3netmgr.cc \ pqi/p3dhtmgr.cc \ pqi/p3notify.cc \ + pqi/pqiqos.cc \ pqi/pqiarchive.cc \ pqi/pqibin.cc \ pqi/pqihandler.cc \ diff --git a/libretroshare/src/pqi/pqi_base.h b/libretroshare/src/pqi/pqi_base.h index 654dd8383..6fd95f303 100644 --- a/libretroshare/src/pqi/pqi_base.h +++ b/libretroshare/src/pqi/pqi_base.h @@ -110,38 +110,46 @@ class NetInterface; **/ class PQInterface: public RateInterface { -public: - PQInterface(const std::string &id) :peerId(id) { return; } -virtual ~PQInterface() { return; } + public: + PQInterface(const std::string &id) :peerId(id) { return; } + virtual ~PQInterface() { return; } -/*! - * allows user to send RsItems to a particular facility (file, network) - */ -virtual int SendItem(RsItem *) = 0; + /*! + * allows user to send RsItems to a particular facility (file, network) + */ + virtual int SendItem(RsItem *) = 0; -/*! - * Retrieve RsItem from a facility - */ -virtual RsItem *GetItem() = 0; + // this function is overloaded in classes that need the serilized size to be returned. + virtual int SendItem(RsItem *item,uint32_t& size) + { + size = 0 ; + std::cerr << "Warning: PQInterface::SendItem(RsItem*,uint32_t&) calledbut not overloaded! Serialized size will not be returned." << std::endl; + return SendItem(item) ; + } -/** - * also there are tick + person id functions. - */ -virtual int tick() { return 0; } -virtual int status() { return 0; } -virtual std::string PeerId() { return peerId; } + /*! + * Retrieve RsItem from a facility + */ + virtual RsItem *GetItem() = 0; - // the callback from NetInterface Connection Events. -virtual int notifyEvent(NetInterface *ni, int event) - { - (void) ni; /* remove unused parameter warnings */ - (void) event; /* remove unused parameter warnings */ - return 0; - } + /** + * also there are tick + person id functions. + */ + virtual int tick() { return 0; } + virtual int status() { return 0; } + virtual std::string PeerId() { return peerId; } + + // the callback from NetInterface Connection Events. + virtual int notifyEvent(NetInterface *ni, int event) + { + (void) ni; /* remove unused parameter warnings */ + (void) event; /* remove unused parameter warnings */ + return 0; + } private: - std::string peerId; + std::string peerId; }; diff --git a/libretroshare/src/pqi/pqihandler.cc b/libretroshare/src/pqi/pqihandler.cc index 93bbc045e..c5f76ffb6 100644 --- a/libretroshare/src/pqi/pqihandler.cc +++ b/libretroshare/src/pqi/pqihandler.cc @@ -33,12 +33,16 @@ #include const int pqihandlerzone = 34283; +static const int PQI_HANDLER_NB_PRIORITY_LEVELS = 10 ; +static const float PQI_HANDLER_NB_PRIORITY_RATIO = 2 ; + /**** #define DEBUG_TICK 1 #define RSITEM_DEBUG 1 ****/ +//#define DEBUG_QOS 1 -pqihandler::pqihandler(SecurityPolicy *Global) : coreMtx("pqihandler") +pqihandler::pqihandler(SecurityPolicy *Global) : pqiQoS(PQI_HANDLER_NB_PRIORITY_LEVELS,PQI_HANDLER_NB_PRIORITY_RATIO),coreMtx("pqihandler") { RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/ @@ -60,6 +64,9 @@ pqihandler::pqihandler(SecurityPolicy *Global) : coreMtx("pqihandler") rateIndiv_in = 0.01; rateMax_out = 0.01; rateMax_in = 0.01; + last_m = time(NULL) ; + nb_ticks = 0 ; + ticks_per_sec = 5 ; // initial guess return; } @@ -67,34 +74,96 @@ int pqihandler::tick() { int moreToTick = 0; - { RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/ + { + RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/ - // tick all interfaces... - std::map::iterator it; - for(it = mods.begin(); it != mods.end(); it++) - { - if (0 < ((it -> second) -> pqi) -> tick()) + // tick all interfaces... + std::map::iterator it; + for(it = mods.begin(); it != mods.end(); it++) + { + if (0 < ((it -> second) -> pqi) -> tick()) + { +#ifdef DEBUG_TICK + std::cerr << "pqihandler::tick() moreToTick from mod()" << std::endl; +#endif + moreToTick = 1; + } + } + // get the items, and queue them correctly + if (0 < locked_GetItems()) { #ifdef DEBUG_TICK - std::cerr << "pqihandler::tick() moreToTick from mod()" << std::endl; + std::cerr << "pqihandler::tick() moreToTick from GetItems()" << std::endl; #endif moreToTick = 1; } } - // get the items, and queue them correctly - if (0 < locked_GetItems()) - { -#ifdef DEBUG_TICK - std::cerr << "pqihandler::tick() moreToTick from GetItems()" << std::endl; -#endif - moreToTick = 1; - } - } /****** UNLOCK ******/ + + // send items from QoS queue + + moreToTick |= drawFromQoS_queue() ; UpdateRates(); return moreToTick; } +bool pqihandler::drawFromQoS_queue() +{ + float avail_out = getMaxRate(false) * 1024 / ticks_per_sec ; + + RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/ + + ++nb_ticks ; + time_t now = time(NULL) ; + if(last_m + 3 < now) + { + ticks_per_sec = nb_ticks / (float)(now - last_m) ; + nb_ticks = 0 ; + last_m = now ; + } +#ifdef DEBUG_QOS + std::cerr << "ticks per sec: " << ticks_per_sec << ", max rate in bytes/s = " << avail_out*ticks_per_sec << ", avail out per tick= " << avail_out << std::endl; +#endif + + uint64_t total_bytes_sent = 0 ; + RsItem *item ; + + while( total_bytes_sent < avail_out && (item = out_rsItem()) != NULL) + { + // + uint32_t size ; + locked_HandleRsItem(item, 0, size); + total_bytes_sent += size ; +#ifdef DEBUG_QOS + std::cerr << "treating item " << (void*)item << ", priority " << (int)item->priority_level() << ", size=" << size << ", total = " << total_bytes_sent << ", queue size = " << qos_queue_size() << std::endl; +#endif + } +#ifdef DEBUG_QOS + assert(total_bytes_sent >= avail_out || qos_queue_size() == 0) ; + std::cerr << "total bytes sent = " << total_bytes_sent << ", " ; + if(qos_queue_size() > 0) + std::cerr << "Queue still has " << qos_queue_size() << " elements." << std::endl; + else + std::cerr << "Queue is empty." << std::endl; +#endif + + return (qos_queue_size() > 0) ; +} + + +bool pqihandler::queueOutRsItem(RsItem *item) +{ + RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/ + in_rsItem(item) ; + +#ifdef DEBUG_QOS + if(item->priority_level() == QOS_PRIORITY_UNKNOWN) + std::cerr << "Caught an unprioritized item !" << std::endl; + + print() ; +#endif + return true ; +} int pqihandler::status() { @@ -200,10 +269,9 @@ int pqihandler::locked_checkOutgoingRsItem(RsItem */*item*/, int /*global*/) // generalised output -int pqihandler::HandleRsItem(RsItem *item, int allowglobal) +int pqihandler::locked_HandleRsItem(RsItem *item, int allowglobal,uint32_t& computed_size) { - RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/ - + computed_size = 0 ; std::map::iterator it; pqioutput(PQL_DEBUG_BASIC, pqihandlerzone, "pqihandler::HandleRsItem()"); @@ -217,7 +285,7 @@ int pqihandler::HandleRsItem(RsItem *item, int allowglobal) out << " Cannot send out Global RsItem"; pqioutput(PQL_ALERT, pqihandlerzone, out.str()); #ifdef DEBUG_TICK - std::cerr << out.str(); + std::cerr << out.str(); #endif delete item; return -1; @@ -271,7 +339,7 @@ int pqihandler::HandleRsItem(RsItem *item, int allowglobal) #endif // if yes send on item. - ((it -> second) -> pqi) -> SendItem(item); + ((it -> second) -> pqi) -> SendItem(item,computed_size); return 1; } else @@ -294,50 +362,51 @@ int pqihandler::HandleRsItem(RsItem *item, int allowglobal) int pqihandler::SearchSpecific(RsCacheRequest *ns) { - return HandleRsItem(ns, 0); + return queueOutRsItem(ns) ; } int pqihandler::SendSearchResult(RsCacheItem *ns) { - return HandleRsItem(ns, 0); + return queueOutRsItem(ns) ; } int pqihandler::SendFileRequest(RsFileRequest *ns) { - return HandleRsItem(ns, 0); + return queueOutRsItem(ns) ; } int pqihandler::SendFileData(RsFileData *ns) { - return HandleRsItem(ns, 0); + return queueOutRsItem(ns) ; } int pqihandler::SendFileChunkMapRequest(RsFileChunkMapRequest *ns) { - return HandleRsItem(ns, 0); + return queueOutRsItem(ns) ; } int pqihandler::SendFileChunkMap(RsFileChunkMap *ns) { - return HandleRsItem(ns, 0); + return queueOutRsItem(ns) ; } int pqihandler::SendFileCRC32MapRequest(RsFileCRC32MapRequest *ns) { - return HandleRsItem(ns, 0); + return queueOutRsItem(ns) ; } int pqihandler::SendFileCRC32Map(RsFileCRC32Map *ns) { - return HandleRsItem(ns, 0); + return queueOutRsItem(ns) ; } + int pqihandler::SendRsRawItem(RsRawItem *ns) { - pqioutput(PQL_DEBUG_BASIC, pqihandlerzone, - "pqihandler::SendRsRawItem()"); -#ifdef DEBUG_TICK - std::cerr << "pqihandler::SendRsRawItem()" << std ::endl; -#endif - return HandleRsItem(ns, 0); + pqioutput(PQL_DEBUG_BASIC, pqihandlerzone, "pqihandler::SendRsRawItem()"); + + // queue the item into the QoS + + return queueOutRsItem(ns) ; } + // inputs. This is a very basic // system that is completely biased and slow... // someone please fix. @@ -695,12 +764,14 @@ int pqihandler::UpdateRates() used_bw_in += crate_in; used_bw_out += crate_out; } +#ifdef DEBUG_QOS // std::cerr << "Totals (In) Used B/W " << used_bw_in; // std::cerr << " Available B/W " << avail_in; // std::cerr << " Effective transfers " << effectiveDownloadsSm << std::endl; // std::cerr << "Totals (Out) Used B/W " << used_bw_out; // std::cerr << " Available B/W " << avail_out; // std::cerr << " Effective transfers " << effectiveUploadsSm << std::endl; +#endif locked_StoreCurrentRates(used_bw_in, used_bw_out); diff --git a/libretroshare/src/pqi/pqihandler.h b/libretroshare/src/pqi/pqihandler.h index f03a9c2c9..bf33f9d29 100644 --- a/libretroshare/src/pqi/pqihandler.h +++ b/libretroshare/src/pqi/pqihandler.h @@ -28,6 +28,7 @@ #include "pqi/pqi.h" #include "pqi/pqisecurity.h" +#include "pqi/pqiqos.h" #include "util/rsthreads.h" @@ -45,85 +46,91 @@ class SearchModule // Presents a P3 Face to the world! // and funnels data through to a PQInterface. // -class pqihandler: public P3Interface +class pqihandler: public P3Interface, public pqiQoS { public: - pqihandler(SecurityPolicy *Global); -bool AddSearchModule(SearchModule *mod); -bool RemoveSearchModule(SearchModule *mod); + pqihandler(SecurityPolicy *Global); + bool AddSearchModule(SearchModule *mod); + bool RemoveSearchModule(SearchModule *mod); -// P3Interface. -virtual int SearchSpecific(RsCacheRequest *ns); -virtual int SendSearchResult(RsCacheItem *); + // P3Interface. + virtual int SearchSpecific(RsCacheRequest *ns); + virtual int SendSearchResult(RsCacheItem *); -// inputs. -virtual RsCacheRequest * RequestedSearch(); -virtual RsCacheItem * GetSearchResult(); + // inputs. + virtual RsCacheRequest * RequestedSearch(); + virtual RsCacheItem * GetSearchResult(); -// file i/o -virtual int SendFileRequest(RsFileRequest *ns); -virtual int SendFileData(RsFileData *ns); -virtual int SendFileChunkMapRequest(RsFileChunkMapRequest *ns); -virtual int SendFileChunkMap(RsFileChunkMap *ns); -virtual int SendFileCRC32MapRequest(RsFileCRC32MapRequest *ns); -virtual int SendFileCRC32Map(RsFileCRC32Map *ns); -virtual RsFileRequest *GetFileRequest(); -virtual RsFileData *GetFileData(); -virtual RsFileChunkMapRequest *GetFileChunkMapRequest(); -virtual RsFileChunkMap *GetFileChunkMap(); -virtual RsFileCRC32MapRequest *GetFileCRC32MapRequest(); -virtual RsFileCRC32Map *GetFileCRC32Map(); + // file i/o + virtual int SendFileRequest(RsFileRequest *ns); + virtual int SendFileData(RsFileData *ns); + virtual int SendFileChunkMapRequest(RsFileChunkMapRequest *ns); + virtual int SendFileChunkMap(RsFileChunkMap *ns); + virtual int SendFileCRC32MapRequest(RsFileCRC32MapRequest *ns); + virtual int SendFileCRC32Map(RsFileCRC32Map *ns); + virtual RsFileRequest *GetFileRequest(); + virtual RsFileData *GetFileData(); + virtual RsFileChunkMapRequest *GetFileChunkMapRequest(); + virtual RsFileChunkMap *GetFileChunkMap(); + virtual RsFileCRC32MapRequest *GetFileCRC32MapRequest(); + virtual RsFileCRC32Map *GetFileCRC32Map(); -// Rest of P3Interface -virtual int tick(); -virtual int status(); + // Rest of P3Interface + virtual int tick(); + virtual int status(); -// Service Data Interface -virtual int SendRsRawItem(RsRawItem *); -virtual RsRawItem *GetRsRawItem(); + // Service Data Interface + virtual int SendRsRawItem(RsRawItem *); + virtual RsRawItem *GetRsRawItem(); - // rate control. -//indiv rate is deprecated -//void setMaxIndivRate(bool in, float val); -//float getMaxIndivRate(bool in); -void setMaxRate(bool in, float val); -float getMaxRate(bool in); + // rate control. + //indiv rate is deprecated + //void setMaxIndivRate(bool in, float val); + //float getMaxIndivRate(bool in); + void setMaxRate(bool in, float val); + float getMaxRate(bool in); -void getCurrentRates(float &in, float &out); + void getCurrentRates(float &in, float &out); + bool drawFromQoS_queue() ; protected: - /* check to be overloaded by those that can - * generates warnings otherwise - */ + /* check to be overloaded by those that can + * generates warnings otherwise + */ -int HandleRsItem(RsItem *ns, int allowglobal); + int locked_HandleRsItem(RsItem *ns, int allowglobal,uint32_t& size); + bool queueOutRsItem(RsItem *) ; -virtual int locked_checkOutgoingRsItem(RsItem *item, int global); -int locked_GetItems(); -void locked_SortnStoreItem(RsItem *item); + virtual int locked_checkOutgoingRsItem(RsItem *item, int global); + int locked_GetItems(); + void locked_SortnStoreItem(RsItem *item); - RsMutex coreMtx; /* MUTEX */ + RsMutex coreMtx; /* MUTEX */ - std::map mods; - SecurityPolicy *globsec; + std::map mods; + SecurityPolicy *globsec; - // Temporary storage... - std::list in_result, in_search, in_request, in_data, in_service,in_chunkmap,in_chunkmap_request,in_crc32map_request,in_crc32map; + // Temporary storage... + std::list in_result, in_search, in_request, in_data, in_service,in_chunkmap,in_chunkmap_request,in_crc32map_request,in_crc32map; private: - // rate control. -int UpdateRates(); -void locked_StoreCurrentRates(float in, float out); + // rate control. + int UpdateRates(); + void locked_StoreCurrentRates(float in, float out); - float rateIndiv_in; - float rateIndiv_out; - float rateMax_in; - float rateMax_out; + float rateIndiv_in; + float rateIndiv_out; + float rateMax_in; + float rateMax_out; - float rateTotal_in; - float rateTotal_out; + float rateTotal_in; + float rateTotal_out; + + uint32_t nb_ticks ; + time_t last_m ; + float ticks_per_sec ; }; //inline void pqihandler::setMaxIndivRate(bool in, float val) diff --git a/libretroshare/src/pqi/pqiperson.cc b/libretroshare/src/pqi/pqiperson.cc index a514404ed..5a6c519e4 100644 --- a/libretroshare/src/pqi/pqiperson.cc +++ b/libretroshare/src/pqi/pqiperson.cc @@ -61,7 +61,7 @@ pqiperson::~pqiperson() // The PQInterface interface. -int pqiperson::SendItem(RsItem *i) +int pqiperson::SendItem(RsItem *i,uint32_t& serialized_size) { std::ostringstream out; out << "pqiperson::SendItem()"; @@ -72,7 +72,7 @@ int pqiperson::SendItem(RsItem *i) #ifdef PERSON_DEBUG std::cerr << out.str() << std::endl; #endif - return activepqi -> SendItem(i); + return activepqi -> SendItem(i,serialized_size); } else { diff --git a/libretroshare/src/pqi/pqiperson.h b/libretroshare/src/pqi/pqiperson.h index a2e27d096..7220447ee 100644 --- a/libretroshare/src/pqi/pqiperson.h +++ b/libretroshare/src/pqi/pqiperson.h @@ -120,7 +120,13 @@ int receiveHeartbeat(); int addChildInterface(uint32_t type, pqiconnect *pqi); // The PQInterface interface. -virtual int SendItem(RsItem *); +virtual int SendItem(RsItem *,uint32_t& serialized_size); +virtual int SendItem(RsItem *item) +{ + std::cerr << "Warning pqiperson::sendItem(RsItem*) should not be called. Plz call SendItem(RsItem *,uint32_t& serialized_size) instead." << std::endl; + uint32_t serialized_size ; + return SendItem(item,serialized_size) ; +} virtual RsItem *GetItem(); virtual int status(); diff --git a/libretroshare/src/pqi/pqistreamer.cc b/libretroshare/src/pqi/pqistreamer.cc index 624aff2f3..a17baf366 100644 --- a/libretroshare/src/pqi/pqistreamer.cc +++ b/libretroshare/src/pqi/pqistreamer.cc @@ -138,14 +138,6 @@ pqistreamer::~pqistreamer() free(pkt); } - // clean up outgoing (data packets) - while(out_data.size() > 0) - { - void *pkt = out_data.front(); - out_data.pop_front(); - free(pkt); - } - if (pkt_wpending) { free(pkt_wpending); @@ -166,7 +158,7 @@ pqistreamer::~pqistreamer() // Get/Send Items. -int pqistreamer::SendItem(RsItem *si) +int pqistreamer::SendItem(RsItem *si,uint32_t& out_size) { #ifdef RSITEM_DEBUG { @@ -178,7 +170,7 @@ int pqistreamer::SendItem(RsItem *si) } #endif - return queue_outpqi(si); + return queue_outpqi(si,out_size); } RsItem *pqistreamer::GetItem() @@ -201,15 +193,17 @@ RsItem *pqistreamer::GetItem() // // PQInterface int pqistreamer::tick() { - { - std::ostringstream out; - out << "pqistreamer::tick()"; - out << std::endl; - out << PeerId() << ": currRead/Sent: " << currRead << "/" << currSent; - out << std::endl; +#ifdef DEBUG_PQISTREAMER + { + std::ostringstream out; + out << "pqistreamer::tick()"; + out << std::endl; + out << PeerId() << ": currRead/Sent: " << currRead << "/" << currSent; + out << std::endl; - pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out.str()); + pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out.str()); } +#endif bio->tick(); @@ -227,13 +221,14 @@ int pqistreamer::tick() handleincoming(); handleoutgoing(); +#ifdef DEBUG_PQISTREAMER /* give details of the packets */ { - std::list::iterator it; + std::list::iterator it; std::ostringstream out; out << "pqistreamer::tick() Queued Data:"; - out << " for " << PeerId(); + out << " for " << PeerId(); if (bio->isactive()) { @@ -270,11 +265,12 @@ int pqistreamer::tick() out << std::endl; } - pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out.str()); + pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out.str()); } +#endif /* if there is more stuff in the queues */ - if ((incoming.size() > 0) || (out_pkt.size() > 0) || (out_data.size() > 0)) + if ((incoming.size() > 0) || (out_pkt.size() > 0)) { return 1; } @@ -302,8 +298,9 @@ int pqistreamer::status() // /**************** HANDLE OUTGOING TRANSLATION + TRANSMISSION ******/ -int pqistreamer::queue_outpqi(RsItem *pqi) +int pqistreamer::queue_outpqi(RsItem *pqi,uint32_t& pktsize) { + pktsize = 0 ; #ifdef DEBUG_PQISTREAMER std::cerr << "pqistreamer::queue_outpqi() called." << std::endl; #endif @@ -316,44 +313,28 @@ int pqistreamer::queue_outpqi(RsItem *pqi) if(dynamic_cast(pqi)!=NULL && (bio_flags & BIN_FLAGS_NO_DELETE)) { std::cerr << "Having file data with flags = " << bio_flags << std::endl ; - *(int*)0x0=1 ; + } + + { + std::ostringstream out; + out << "pqistreamer::queue_outpqi()"; + pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out.str()); } #endif - { - std::ostringstream out; - out << "pqistreamer::queue_outpqi()"; - pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out.str()); - } - /* decide which type of packet it is */ RsFileData *dta = dynamic_cast(pqi); // This is the old test method - bool isCntrl = (dta == NULL); - if(pqi->queueType() == RsItem::DATA_QUEUE) // this is the new test method. More general. - { -#ifdef DEBUG_PQISTREAMER - std::cerr << "PQISTREAMER:: got a data queue packet !!" << std::endl ; -#endif - isCntrl = false ; - } - - uint32_t pktsize = rsSerialiser->size(pqi); + pktsize = rsSerialiser->size(pqi); void *ptr = malloc(pktsize); #ifdef DEBUG_PQISTREAMER - std::cerr << "pqistreamer::queue_outpqi() serializing packet with packet size : " << pktsize << std::endl; + std::cerr << "pqistreamer::queue_outpqi() serializing packet with packet size : " << pktsize << std::endl; #endif - if (rsSerialiser->serialise(pqi, ptr, &pktsize)) + if (rsSerialiser->serialise(pqi, ptr, &pktsize)) { - if (isCntrl) - { - out_pkt.push_back(ptr); - } - else - { - out_data.push_back(ptr); - } + out_pkt.push_back(ptr); + if (!(bio_flags & BIN_FLAGS_NO_DELETE)) { delete pqi; @@ -382,11 +363,13 @@ int pqistreamer::queue_outpqi(RsItem *pqi) int pqistreamer::handleincomingitem(RsItem *pqi) { - { - std::ostringstream out; - out << "pqistreamer::handleincomingitem()"; - pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out.str()); +#ifdef DEBUG_PQISTREAMER + { + std::ostringstream out; + out << "pqistreamer::handleincomingitem()"; + pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out.str()); } +#endif // Use overloaded Contact function pqi -> PeerId(PeerId()); @@ -398,11 +381,13 @@ int pqistreamer::handleoutgoing() { RsStackMutex stack(streamerMtx) ; // lock out_pkt and out_data +#ifdef DEBUG_PQISTREAMER { std::ostringstream out; out << "pqistreamer::handleoutgoing()"; pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out.str()); } +#endif int maxbytes = outAllowedBytes(); int sentbytes = 0; @@ -420,21 +405,12 @@ int pqistreamer::handleoutgoing() { free(*it); it = out_pkt.erase(it); - +#ifdef DEBUG_PQISTREAMER std::ostringstream out; out << "pqistreamer::handleoutgoing() Not active -> Clearing Pkt!"; // std::cerr << out.str() ; pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out.str()); - } - for(it = out_data.begin(); it != out_data.end(); ) - { - free(*it); - it = out_data.erase(it); - - std::ostringstream out; - out << "pqistreamer::handleoutgoing() Not active -> Clearing DPkt!"; - // std::cerr << out.str() ; - pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out.str()); +#endif } /* also remove the pending packets */ @@ -478,7 +454,6 @@ int pqistreamer::handleoutgoing() // send a out_pkt., else send out_data. unless // there is a pending packet. if (!pkt_wpending) - { if (out_pkt.size() > 0) { pkt_wpending = *(out_pkt.begin()); @@ -487,22 +462,10 @@ int pqistreamer::handleoutgoing() std::cerr << "pqistreamer::handleoutgoing() getting next pkt from out_pkt queue"; std::cerr << std::endl; #endif - } - else if (out_data.size() > 0) - { - pkt_wpending = *(out_data.begin()); - out_data.pop_front(); -#ifdef DEBUG_TRANSFERS - std::cerr << "pqistreamer::handleoutgoing() getting next pkt from out_data queue"; - std::cerr << std::endl; -#endif - } - } if (pkt_wpending) { - std::ostringstream out; // write packet. len = getRsItemSize(pkt_wpending); @@ -512,6 +475,7 @@ int pqistreamer::handleoutgoing() if (len != (ss = bio->senddata(pkt_wpending, len))) { + std::ostringstream out; out << "Problems with Send Data! (only " << ss << " bytes sent" << ", total pkt size=" << len ; out << std::endl; // std::cerr << out.str() ; @@ -528,10 +492,6 @@ int pqistreamer::handleoutgoing() std::cerr << std::endl; #endif -// out << " Success!" << ", sent " << len << " bytes" << std::endl; - // std::cerr << out.str() ; - pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out.str()); - free(pkt_wpending); pkt_wpending = NULL; @@ -551,11 +511,13 @@ int pqistreamer::handleincoming() int readbytes = 0; static const int max_failed_read_attempts = 2000 ; +#ifdef DEBUG_PQISTREAMER { std::ostringstream out; out << "pqistreamer::handleincoming()"; pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out.str()); } +#endif if(!(bio->isactive())) { diff --git a/libretroshare/src/pqi/pqistreamer.h b/libretroshare/src/pqi/pqistreamer.h index 74b99c1b5..e01477b84 100644 --- a/libretroshare/src/pqi/pqistreamer.h +++ b/libretroshare/src/pqi/pqistreamer.h @@ -41,75 +41,80 @@ class pqistreamer: public PQInterface { -public: - pqistreamer(RsSerialiser *rss, std::string peerid, BinInterface *bio_in, int bio_flagsin); -virtual ~pqistreamer(); + public: + pqistreamer(RsSerialiser *rss, std::string peerid, BinInterface *bio_in, int bio_flagsin); + virtual ~pqistreamer(); -// PQInterface -virtual int SendItem(RsItem *); -virtual RsItem *GetItem(); + // PQInterface + virtual int SendItem(RsItem *item) + { + std::cerr << "Warning pqistreamer::sendItem(RsItem*) should not be called. Plz call SendItem(RsItem *,uint32_t& serialized_size) instead." << std::endl; + uint32_t serialized_size ; + return SendItem(item,serialized_size) ; + } + virtual int SendItem(RsItem *,uint32_t& serialized_size); + virtual RsItem *GetItem(); -virtual int tick(); -virtual int status(); + virtual int tick(); + virtual int status(); private: - /* Implementation */ + /* Implementation */ - // to filter functions - detect filecancel/data and act! -int queue_outpqi( RsItem *i); -int handleincomingitem(RsItem *i); + // to filter functions - detect filecancel/data and act! + int queue_outpqi( RsItem *i,uint32_t& serialized_size); + int handleincomingitem(RsItem *i); - // ticked regularly (manages out queues and sending - // via above interfaces. -int handleoutgoing(); -int handleincoming(); + // ticked regularly (manages out queues and sending + // via above interfaces. + int handleoutgoing(); + int handleincoming(); - // Bandwidth/Streaming Management. -float outTimeSlice(); + // Bandwidth/Streaming Management. + float outTimeSlice(); -int outAllowedBytes(); -void outSentBytes(int ); + int outAllowedBytes(); + void outSentBytes(int ); -int inAllowedBytes(); -void inReadBytes(int ); + int inAllowedBytes(); + void inReadBytes(int ); - // RsSerialiser - determines which packets can be serialised. - RsSerialiser *rsSerialiser; - // Binary Interface for IO, initialisated at startup. - BinInterface *bio; - unsigned int bio_flags; // BIN_FLAGS_NO_CLOSE | BIN_FLAGS_NO_DELETE + // RsSerialiser - determines which packets can be serialised. + RsSerialiser *rsSerialiser; + // Binary Interface for IO, initialisated at startup. + BinInterface *bio; + unsigned int bio_flags; // BIN_FLAGS_NO_CLOSE | BIN_FLAGS_NO_DELETE - void *pkt_wpending; // storage for pending packet to write. - int pkt_rpend_size; // size of pkt_rpending. - void *pkt_rpending; // storage for read in pending packets. + void *pkt_wpending; // storage for pending packet to write. + int pkt_rpend_size; // size of pkt_rpending. + void *pkt_rpending; // storage for read in pending packets. - enum {reading_state_packet_started=1, + enum {reading_state_packet_started=1, reading_state_initial=0 } ; - int reading_state ; - int failed_read_attempts ; + int reading_state ; + int failed_read_attempts ; - // Temp Storage for transient data..... - std::list out_pkt; // Cntrl / Search / Results queue - std::list out_data; // FileData - secondary queue. - std::list incoming; + // Temp Storage for transient data..... + std::list out_pkt; // Cntrl / Search / Results queue + std::list incoming; - // data for network stats. - int totalRead; - int totalSent; + // data for network stats. + int totalRead; + int totalSent; - // these are representative (but not exact) - int currRead; - int currSent; - int currReadTS; // TS from which these are measured. - int currSentTS; + // these are representative (but not exact) + int currRead; + int currSent; + int currReadTS; // TS from which these are measured. + int currSentTS; - int avgLastUpdate; // TS from which these are measured. - float avgReadCount; - float avgSentCount; + int avgLastUpdate; // TS from which these are measured. + float avgReadCount; + float avgSentCount; - RsMutex streamerMtx ; -// pthread_t thread_id; + RsMutex streamerMtx ; + // pthread_t thread_id; }; diff --git a/libretroshare/src/serialiser/itempriorities.h b/libretroshare/src/serialiser/itempriorities.h new file mode 100644 index 000000000..db2bb5a26 --- /dev/null +++ b/libretroshare/src/serialiser/itempriorities.h @@ -0,0 +1,75 @@ +/* + * libretroshare/src/serialiser: itempriorities.h + * + * 3P/PQI network interface for RetroShare. + * + * Copyright 2011-2011 by Cyril Soler + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License Version 2 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 + * USA. + * + * Please report all bugs and problems to "csoler@users.sourceforge.net" + * + */ + +// This file centralises QoS priorities for all transfer RsItems. +// +const uint8_t QOS_PRIORITY_UNKNOWN = 0 ; +const uint8_t QOS_PRIORITY_DEFAULT = 3 ; +const uint8_t QOS_PRIORITY_TOP = 9 ; + +// Turtle traffic +// +const uint8_t QOS_PRIORITY_RS_TURTLE_OPEN_TUNNEL = 6 ; +const uint8_t QOS_PRIORITY_RS_TURTLE_TUNNEL_OK = 6 ; +const uint8_t QOS_PRIORITY_RS_TURTLE_SEARCH_REQUEST = 5 ; +const uint8_t QOS_PRIORITY_RS_TURTLE_FILE_REQUEST = 5 ; +const uint8_t QOS_PRIORITY_RS_TURTLE_FILE_CRC_REQUEST = 5 ; +const uint8_t QOS_PRIORITY_RS_TURTLE_FILE_MAP_REQUEST = 5 ; +const uint8_t QOS_PRIORITY_RS_TURTLE_SEARCH_RESULT = 3 ; +const uint8_t QOS_PRIORITY_RS_TURTLE_FILE_DATA = 3 ; +const uint8_t QOS_PRIORITY_RS_TURTLE_FILE_CRC = 3 ; +const uint8_t QOS_PRIORITY_RS_TURTLE_FILE_MAP = 3 ; +const uint8_t QOS_PRIORITY_RS_TURTLE_GENERIC_ITEM = 3 ; +const uint8_t QOS_PRIORITY_RS_TURTLE_FORWARD_FILE_DATA= 2 ; + +// File transfer +// +const uint8_t QOS_PRIORITY_RS_FILE_REQUEST = 5 ; +const uint8_t QOS_PRIORITY_RS_FILE_CRC_REQUEST = 5 ; +const uint8_t QOS_PRIORITY_RS_FILE_MAP_REQUEST = 5 ; +const uint8_t QOS_PRIORITY_RS_CACHE_REQUEST = 4 ; +const uint8_t QOS_PRIORITY_RS_FILE_DATA = 3 ; +const uint8_t QOS_PRIORITY_RS_FILE_CRC = 3 ; +const uint8_t QOS_PRIORITY_RS_FILE_MAP = 3 ; +const uint8_t QOS_PRIORITY_RS_CACHE_ITEM = 3 ; + +// Discovery +// +const uint8_t QOS_PRIORITY_RS_DISC_HEART_BEAT = 8 ; +const uint8_t QOS_PRIORITY_RS_DISC_ASK_INFO = 2 ; +const uint8_t QOS_PRIORITY_RS_DISC_REPLY = 1 ; +const uint8_t QOS_PRIORITY_RS_DISC_VERSION = 1 ; + +// Chat/Msgs +// +const uint8_t QOS_PRIORITY_RS_CHAT_ITEM = 7 ; +const uint8_t QOS_PRIORITY_RS_CHAT_AVATAR_ITEM = 2 ; +const uint8_t QOS_PRIORITY_RS_MSG_ITEM = 2 ; +const uint8_t QOS_PRIORITY_RS_STATUS_ITEM = 2 ; + +// VOIP +// +const uint8_t QOS_PRIORITY_RS_VOIP_PING = 9 ; + diff --git a/libretroshare/src/serialiser/rsbaseitems.h b/libretroshare/src/serialiser/rsbaseitems.h index 89cd21107..100aaa6b5 100644 --- a/libretroshare/src/serialiser/rsbaseitems.h +++ b/libretroshare/src/serialiser/rsbaseitems.h @@ -55,7 +55,9 @@ class RsFileRequest: public RsItem :RsItem(RS_PKT_VERSION1, RS_PKT_CLASS_BASE, RS_PKT_TYPE_FILE, RS_PKT_SUBTYPE_FI_REQUEST) - { return; } + { + setPriorityLevel(QOS_PRIORITY_RS_FILE_REQUEST) ; + } virtual ~RsFileRequest(); virtual void clear(); std::ostream &print(std::ostream &out, uint16_t indent = 0); @@ -74,7 +76,9 @@ class RsFileData: public RsItem :RsItem(RS_PKT_VERSION1, RS_PKT_CLASS_BASE, RS_PKT_TYPE_FILE, RS_PKT_SUBTYPE_FI_DATA) - { return; } + { + setPriorityLevel(QOS_PRIORITY_RS_FILE_DATA) ; + } virtual ~RsFileData(); virtual void clear(); std::ostream &print(std::ostream &out, uint16_t indent = 0); @@ -87,7 +91,9 @@ class RsFileChunkMapRequest: public RsItem public: RsFileChunkMapRequest() :RsItem(RS_PKT_VERSION1, RS_PKT_CLASS_BASE, RS_PKT_TYPE_FILE, RS_PKT_SUBTYPE_FI_CHUNK_MAP_REQUEST) - {} + { + setPriorityLevel(QOS_PRIORITY_RS_FILE_MAP_REQUEST) ; + } virtual ~RsFileChunkMapRequest() {} virtual void clear() {} @@ -102,7 +108,9 @@ class RsFileChunkMap: public RsItem public: RsFileChunkMap() :RsItem(RS_PKT_VERSION1, RS_PKT_CLASS_BASE, RS_PKT_TYPE_FILE, RS_PKT_SUBTYPE_FI_CHUNK_MAP) - {} + { + setPriorityLevel(QOS_PRIORITY_RS_FILE_MAP) ; + } virtual ~RsFileChunkMap() {} virtual void clear() {} @@ -118,7 +126,9 @@ class RsFileCRC32MapRequest: public RsItem public: RsFileCRC32MapRequest() :RsItem(RS_PKT_VERSION1, RS_PKT_CLASS_BASE, RS_PKT_TYPE_FILE, RS_PKT_SUBTYPE_FI_CRC32_MAP_REQUEST) - {} + { + setPriorityLevel(QOS_PRIORITY_RS_FILE_CRC_REQUEST) ; + } virtual ~RsFileCRC32MapRequest() {} virtual void clear() {} @@ -132,7 +142,9 @@ class RsFileCRC32Map: public RsItem public: RsFileCRC32Map() :RsItem(RS_PKT_VERSION1, RS_PKT_CLASS_BASE, RS_PKT_TYPE_FILE, RS_PKT_SUBTYPE_FI_CRC32_MAP) - {} + { + setPriorityLevel(QOS_PRIORITY_RS_FILE_CRC) ; + } virtual ~RsFileCRC32Map() {} virtual void clear() {} @@ -190,7 +202,9 @@ class RsCacheRequest: public RsItem :RsItem(RS_PKT_VERSION1, RS_PKT_CLASS_BASE, RS_PKT_TYPE_CACHE, RS_PKT_SUBTYPE_CACHE_REQUEST) - { return; } + { + setPriorityLevel(QOS_PRIORITY_RS_CACHE_REQUEST); + } virtual ~RsCacheRequest(); virtual void clear(); std::ostream &print(std::ostream &out, uint16_t indent = 0); @@ -209,7 +223,9 @@ class RsCacheItem: public RsItem :RsItem(RS_PKT_VERSION1, RS_PKT_CLASS_BASE, RS_PKT_TYPE_CACHE, RS_PKT_SUBTYPE_CACHE_ITEM) - { return; } + { + setPriorityLevel(QOS_PRIORITY_RS_CACHE_ITEM); + } virtual ~RsCacheItem(); virtual void clear(); std::ostream &print(std::ostream &out, uint16_t indent = 0); diff --git a/libretroshare/src/serialiser/rsdiscitems.h b/libretroshare/src/serialiser/rsdiscitems.h index fa7234c73..cfa6e70a7 100644 --- a/libretroshare/src/serialiser/rsdiscitems.h +++ b/libretroshare/src/serialiser/rsdiscitems.h @@ -78,7 +78,9 @@ class RsDiscReply: public RsDiscItem RsDiscReply() :RsDiscItem(RS_PKT_SUBTYPE_DISC_REPLY) - { return; } + { + setPriorityLevel(QOS_PRIORITY_RS_DISC_REPLY); + } virtual ~RsDiscReply(); @@ -107,7 +109,9 @@ class RsDiscAskInfo: public RsDiscItem RsDiscAskInfo() :RsDiscItem(RS_PKT_SUBTYPE_DISC_ASK_INFO) - { return; } + { + setPriorityLevel(QOS_PRIORITY_RS_DISC_ASK_INFO); + } virtual ~RsDiscAskInfo(); @@ -121,7 +125,9 @@ class RsDiscVersion: public RsDiscItem { public: RsDiscVersion() :RsDiscItem(RS_PKT_SUBTYPE_DISC_VERSION) - { return; } + { + setPriorityLevel(QOS_PRIORITY_RS_DISC_VERSION); + } virtual ~RsDiscVersion(); @@ -135,7 +141,9 @@ class RsDiscHeartbeat: public RsDiscItem { public: RsDiscHeartbeat() :RsDiscItem(RS_PKT_SUBTYPE_DISC_HEARTBEAT) - { return; } + { + setPriorityLevel(QOS_PRIORITY_RS_DISC_HEART_BEAT) ; + } virtual ~RsDiscHeartbeat(); diff --git a/libretroshare/src/serialiser/rsmsgitems.cc b/libretroshare/src/serialiser/rsmsgitems.cc index 1de2b5224..4586146dd 100644 --- a/libretroshare/src/serialiser/rsmsgitems.cc +++ b/libretroshare/src/serialiser/rsmsgitems.cc @@ -33,6 +33,7 @@ #define RSSERIAL_DEBUG 1 #define CHAT_DEBUG 1 ***/ +#define CHAT_DEBUG 1 #include @@ -244,6 +245,10 @@ bool RsChatMsgItem::serialise(void *data, uint32_t& pktsize) std::cerr << "computed size: " << 256*((unsigned char*)data)[6]+((unsigned char*)data)[7] << std::endl ; #endif + std::cerr << "Serialization result: " ; + for(int i=0;i<20;++i) + std::cerr << (int)((uint8_t*)data)[i] << " " ; + std::cerr << std::endl ; return ok; } @@ -379,6 +384,11 @@ RsChatMsgItem::RsChatMsgItem(void *data,uint32_t /*size*/) uint32_t rssize = getRsItemSize(data); bool ok = true ; + std::cerr << "Received packet result: " ; + for(int i=0;i<20;++i) + std::cerr << (int)((uint8_t*)data)[i] << " " ; + std::cerr << std::endl ; + /* get mandatory parts first */ ok &= getRawUInt32(data, rssize, &offset, &chatFlags); ok &= getRawUInt32(data, rssize, &offset, &sendTime); diff --git a/libretroshare/src/serialiser/rsmsgitems.h b/libretroshare/src/serialiser/rsmsgitems.h index 9e8eb3455..19767a3ee 100644 --- a/libretroshare/src/serialiser/rsmsgitems.h +++ b/libretroshare/src/serialiser/rsmsgitems.h @@ -61,7 +61,10 @@ const uint8_t RS_PKT_SUBTYPE_MSG_PARENT_TAG = 0x06; class RsChatItem: public RsItem { public: - RsChatItem(uint8_t chat_subtype) : RsItem(RS_PKT_VERSION_SERVICE,RS_SERVICE_TYPE_CHAT,chat_subtype) {} + RsChatItem(uint8_t chat_subtype) : RsItem(RS_PKT_VERSION_SERVICE,RS_SERVICE_TYPE_CHAT,chat_subtype) + { + setPriorityLevel(QOS_PRIORITY_RS_CHAT_ITEM) ; + } virtual ~RsChatItem() {} virtual void clear() {} @@ -78,7 +81,9 @@ class RsChatItem: public RsItem class RsChatMsgItem: public RsChatItem { public: - RsChatMsgItem() :RsChatItem(RS_PKT_SUBTYPE_DEFAULT) {} + RsChatMsgItem() :RsChatItem(RS_PKT_SUBTYPE_DEFAULT) + { + } RsChatMsgItem(void *data,uint32_t size) ; // deserialization virtual ~RsChatMsgItem() {} @@ -148,7 +153,7 @@ class RsChatStatusItem: public RsChatItem class RsChatAvatarItem: public RsChatItem { public: - RsChatAvatarItem() :RsChatItem(RS_PKT_SUBTYPE_CHAT_AVATAR) {} + RsChatAvatarItem() :RsChatItem(RS_PKT_SUBTYPE_CHAT_AVATAR) {setPriorityLevel(QOS_PRIORITY_RS_CHAT_AVATAR_ITEM) ;} RsChatAvatarItem(void *data,uint32_t size) ; // deserialization virtual ~RsChatAvatarItem() ; @@ -196,12 +201,12 @@ class RsMsgItem: public RsItem RsMsgItem() :RsItem(RS_PKT_VERSION_SERVICE, RS_SERVICE_TYPE_MSG, RS_PKT_SUBTYPE_DEFAULT) - { return; } + { setPriorityLevel(QOS_PRIORITY_RS_MSG_ITEM) ; } RsMsgItem(uint16_t type) :RsItem(RS_PKT_VERSION_SERVICE, type, RS_PKT_SUBTYPE_DEFAULT) - { return; } + { setPriorityLevel(QOS_PRIORITY_RS_MSG_ITEM) ; } virtual ~RsMsgItem(); virtual void clear(); diff --git a/libretroshare/src/serialiser/rsserial.cc b/libretroshare/src/serialiser/rsserial.cc index c803970e8..49540e6e0 100644 --- a/libretroshare/src/serialiser/rsserial.cc +++ b/libretroshare/src/serialiser/rsserial.cc @@ -49,7 +49,7 @@ RsItem::RsItem(uint32_t t) :type(t) { - return; + _priority_level = QOS_PRIORITY_UNKNOWN ; // This value triggers PQIInterface to complain about undefined priorities } #ifdef DO_STATISTICS @@ -116,13 +116,13 @@ void RsItem::operator delete(void *p,size_t s) RsItem::RsItem(uint8_t ver, uint8_t cls, uint8_t t, uint8_t subtype) { + _priority_level = QOS_PRIORITY_UNKNOWN ; // This value triggers PQIInterface to complain about undefined priorities + type = (ver << 24) + (cls << 16) + (t << 8) + subtype; - return; } RsItem::~RsItem() { - return; } @@ -158,6 +158,7 @@ uint8_t RsItem::PacketSubType() /* For Service Packets */ RsItem::RsItem(uint8_t ver, uint16_t service, uint8_t subtype) { + _priority_level = QOS_PRIORITY_UNKNOWN ; // This value triggers PQIInterface to complain about undefined priorities type = (ver << 24) + (service << 8) + subtype; return; } diff --git a/libretroshare/src/serialiser/rsserial.h b/libretroshare/src/serialiser/rsserial.h index 05ca80eb8..e4d2a3a38 100644 --- a/libretroshare/src/serialiser/rsserial.h +++ b/libretroshare/src/serialiser/rsserial.h @@ -57,6 +57,7 @@ ******************************************************************/ #include +#include "itempriorities.h" const uint8_t RS_PKT_VERSION1 = 0x01; const uint8_t RS_PKT_VERSION_SERVICE = 0x02; @@ -70,40 +71,40 @@ const uint8_t RS_PKT_SUBTYPE_DEFAULT = 0x01; /* if only one subtype */ class RsItem: public RsMemoryManagement::SmallObject { public: - RsItem(uint32_t t); - RsItem(uint8_t ver, uint8_t cls, uint8_t t, uint8_t subtype); + RsItem(uint32_t t); + RsItem(uint8_t ver, uint8_t cls, uint8_t t, uint8_t subtype); #ifdef DO_STATISTICS - void *operator new(size_t s) ; - void operator delete(void *,size_t s) ; + void *operator new(size_t s) ; + void operator delete(void *,size_t s) ; #endif -virtual ~RsItem(); -virtual void clear() = 0; -virtual std::ostream &print(std::ostream &out, uint16_t indent = 0) = 0; + virtual ~RsItem(); + virtual void clear() = 0; + virtual std::ostream &print(std::ostream &out, uint16_t indent = 0) = 0; - /* source / destination id */ -const std::string& PeerId() const { return peerId; } -void PeerId(const std::string& id) { peerId = id; } + /* source / destination id */ + const std::string& PeerId() const { return peerId; } + void PeerId(const std::string& id) { peerId = id; } - /* complete id */ -uint32_t PacketId(); + /* complete id */ + uint32_t PacketId(); - /* id parts */ -uint8_t PacketVersion(); -uint8_t PacketClass(); -uint8_t PacketType(); -uint8_t PacketSubType(); + /* id parts */ + uint8_t PacketVersion(); + uint8_t PacketClass(); + uint8_t PacketType(); + uint8_t PacketSubType(); - /* For Service Packets */ - RsItem(uint8_t ver, uint16_t service, uint8_t subtype); -uint16_t PacketService(); /* combined Packet class/type (mid 16bits) */ - -typedef enum { CONTROL_QUEUE, DATA_QUEUE } QueueType ; -virtual QueueType queueType() const { return CONTROL_QUEUE ; } + /* For Service Packets */ + RsItem(uint8_t ver, uint16_t service, uint8_t subtype); + uint16_t PacketService(); /* combined Packet class/type (mid 16bits) */ + inline uint8_t priority_level() const { return _priority_level ;} + inline void setPriorityLevel(uint8_t l) { _priority_level = l ;} private: -uint32_t type; -std::string peerId; + uint32_t type; + std::string peerId; + uint8_t _priority_level ; }; @@ -172,31 +173,29 @@ std::ostream &printIndent(std::ostream &out, uint16_t indent); class RsRawItem: public RsItem { public: - RsRawItem(uint32_t t, uint32_t size) - :RsItem(t), len(size), _queue_type(RsItem::CONTROL_QUEUE) - { data = malloc(len);} + RsRawItem(uint32_t t, uint32_t size) + :RsItem(t), len(size) + { + data = malloc(len); + } -virtual ~RsRawItem() - { - if (data) - free(data); - data = NULL; - len = 0; - } + virtual ~RsRawItem() + { + if (data) + free(data); + data = NULL; + len = 0; + } -uint32_t getRawLength() { return len; } -void * getRawData() { return data; } + uint32_t getRawLength() { return len; } + void *getRawData() { return data; } -virtual void clear() { return; } /* what can it do? */ -virtual std::ostream &print(std::ostream &out, uint16_t indent = 0); - -virtual RsItem::QueueType queueType() const { return _queue_type ;} -void setQueueType(const RsItem::QueueType& t) { _queue_type = t ;} + virtual void clear() { return; } /* what can it do? */ + virtual std::ostream &print(std::ostream &out, uint16_t indent = 0); private: - void *data; - uint32_t len; - RsItem::QueueType _queue_type ; + void *data; + uint32_t len; }; diff --git a/libretroshare/src/serialiser/rsstatusitems.h b/libretroshare/src/serialiser/rsstatusitems.h index 688be6a3c..8677c2be0 100644 --- a/libretroshare/src/serialiser/rsstatusitems.h +++ b/libretroshare/src/serialiser/rsstatusitems.h @@ -37,7 +37,9 @@ class RsStatusItem: public RsItem RsStatusItem() :RsItem(RS_PKT_VERSION_SERVICE, RS_SERVICE_TYPE_STATUS, RS_PKT_SUBTYPE_DEFAULT) - { return; } + { + setPriorityLevel(QOS_PRIORITY_RS_STATUS_ITEM); + } virtual ~RsStatusItem(); virtual void clear(); std::ostream &print(std::ostream &out, uint16_t indent = 0); diff --git a/libretroshare/src/serialiser/rsvoipitems.h b/libretroshare/src/serialiser/rsvoipitems.h index 9f7a5124c..f62f21479 100644 --- a/libretroshare/src/serialiser/rsvoipitems.h +++ b/libretroshare/src/serialiser/rsvoipitems.h @@ -40,7 +40,8 @@ const uint8_t RS_PKT_SUBTYPE_VOIP_PONG = 0x02; class RsVoipItem: public RsItem { public: - RsVoipItem(uint8_t chat_subtype) : RsItem(RS_PKT_VERSION_SERVICE,RS_SERVICE_TYPE_VOIP,chat_subtype) {} + RsVoipItem(uint8_t chat_subtype) : RsItem(RS_PKT_VERSION_SERVICE,RS_SERVICE_TYPE_VOIP,chat_subtype) + { setPriorityLevel(QOS_PRIORITY_RS_VOIP_PING) ;} // should be refined later. virtual ~RsVoipItem() {}; virtual void clear() {}; diff --git a/libretroshare/src/services/p3service.cc b/libretroshare/src/services/p3service.cc index e3a40a785..a99052dfc 100644 --- a/libretroshare/src/services/p3service.cc +++ b/libretroshare/src/services/p3service.cc @@ -199,7 +199,17 @@ RsRawItem *p3Service::send() if (raw) { raw->PeerId(si->PeerId()); - raw->setQueueType(si->queueType()) ; + + if(si->priority_level() == QOS_PRIORITY_UNKNOWN) + { + std::cerr << "************************************************************" << std::endl; + std::cerr << "********** Warning: p3service::send() ********" << std::endl; + std::cerr << "********** Warning: caught a RsItem with undefined ********" << std::endl; + std::cerr << "********** priority level. That should not ********" << std::endl; + std::cerr << "********** happen. Please fix your items! ********" << std::endl; + std::cerr << "************************************************************" << std::endl; + } + raw->setPriorityLevel(si->priority_level()) ; } /* cleanup */ diff --git a/libretroshare/src/tests/pqi/Makefile b/libretroshare/src/tests/pqi/Makefile index fe7ccfaac..7a9b10193 100644 --- a/libretroshare/src/tests/pqi/Makefile +++ b/libretroshare/src/tests/pqi/Makefile @@ -1,5 +1,7 @@ RS_TOP_DIR = ../.. +DHT_TOP_DIR = ../../../../libbitdht/src/ + ##### Define any flags that are needed for this section ####### ############################################################### @@ -10,6 +12,7 @@ include $(RS_TOP_DIR)/tests/scripts/config.mk # Generic Test Harnesses. TESTOBJ = conn_harness.o ppg_harness.o +TESTOBJ += pqiqos_test.o TESTOBJ += net_test.o dht_test.o net_test1.o netiface_test.o dht_test.o TESTOBJ += pkt_test.o pqiarchive_test.o pqiperson_test.o TESTOBJ += extaddrfinder_test.o dnsresolver_test.o pqiipset_test.o @@ -17,7 +20,8 @@ TESTOBJ += p3connmgr_reset_test.o p3connmgr_connect_test.o #conn_test.o TESTS = net_test net_test1 netiface_test pqiarchive_test pqiperson_test dnsresolver_test extaddrfinder_test -TESTS += pqiipset_test +TESTS += pqiipset_test +TESTS += pqiqos_test TESTS += p3connmgr_reset_test p3connmgr_connect_test #TESTS = p3connmgr_test1 @@ -51,10 +55,13 @@ pqiarchive_test: pqiarchive_test.o pkt_test.o $(CC) $(CFLAGS) -o pqiarchive_test pkt_test.o pqiarchive_test.o $(LIBS) pqiperson_test: pqiperson_test.o testconnect.o - $(CC) $(CFLAGS) -o pqiperson_test pqiperson_test.o testconnect.o $(LIBS) + $(CC) $(CFLAGS) -o pqiperson_test pqiperson_test.o testconnect.o $(LIBS) -lstdc++ extaddrfinder_test: extaddrfinder_test.o - $(CC) $(CFLAGS) -o extaddrfinder_test extaddrfinder_test.o $(LIBS) + $(CC) $(CFLAGS) -DEXTADDRSEARCH_DEBUG=1 -o extaddrfinder_test extaddrfinder_test.o $(LIBS) + +pqiqos_test: pqiqos_test.o + $(CC) $(CFLAGS) -o pqiqos_test pqiqos_test.o $(LIBS) dnsresolver_test: dnsresolver_test.o $(CC) $(CFLAGS) -o dnsresolver_test dnsresolver_test.o $(LIBS) diff --git a/libretroshare/src/tests/pqi/pqiqos_test.cc b/libretroshare/src/tests/pqi/pqiqos_test.cc new file mode 100644 index 000000000..a62876722 --- /dev/null +++ b/libretroshare/src/tests/pqi/pqiqos_test.cc @@ -0,0 +1,191 @@ +#include "util/utest.h" + +#include +#include +#include +#include "serialiser/rsserial.h" +#include "util/rsrandom.h" +#include "pqi/pqiqos.h" + +INITTEST(); + +// Class of RsItem with ids to check order +// +class testRawItem: public RsItem +{ + public: + testRawItem() + : RsItem(0,0,0), _id(_static_id++) + { + } + + virtual void clear() {} + virtual std::ostream& print(std::ostream& o, uint16_t) { return o ; } + + static const uint32_t rs_rawitem_size = 10 ; + static uint64_t _static_id ; + uint64_t _id ; +}; +uint64_t testRawItem::_static_id = 1; + +int main() +{ + float alpha = 3 ; + int nb_levels = 10 ; + + ////////////////////////////////////////////// + // 1 - Test consistency of output and order // + ////////////////////////////////////////////// + { + pqiQoS qos(nb_levels,alpha) ; + + // 0 - Fill the queue with fake RsItem. + // + static const uint32_t pushed_items = 10000 ; + + for(uint32_t i=0;isetPriorityLevel(i % nb_levels) ; + + qos.in_rsItem(item) ; + } + std::cerr << "QOS is filled with: " << std::endl; + qos.print() ; + + // 1 - checks that all items eventually got out in the same order for + // items of equal priority + // + uint32_t poped = 0; + std::vector last_ids(nb_levels,0) ; + + while(testRawItem *item = static_cast(qos.out_rsItem())) + { + CHECK(last_ids[item->priority_level()] < item->_id) ; + + last_ids[item->priority_level()] = item->_id ; + delete item, ++poped ; + } + + std::cerr << "Push " << pushed_items << " items, poped " << poped << std::endl; + + if(pushed_items != poped) + { + std::cerr << "Queues are: " << std::endl; + qos.print() ; + } + CHECK(pushed_items == poped) ; + } + + ////////////////////////////////////////////////// + // 2 - tests proportionality // + ////////////////////////////////////////////////// + { + // Now we feed the QoS, and check that items get out with probability proportional + // to the priority + // + pqiQoS qos(nb_levels,alpha) ; + + std::cerr << "Feeding 10^6 packets to the QoS service." << std::endl; + for(int i=0;i<1000000;++i) + { + if(i%10000 == 0) + { + fprintf(stderr,"%1.2f %% completed.\r",i/(float)1000000*100.0f) ; + fflush(stderr) ; + } + testRawItem *item = new testRawItem ; + + switch(i%5) + { + case 0: item->setPriorityLevel( 1 ) ; break ; + case 1: item->setPriorityLevel( 4 ) ; break ; + case 2: item->setPriorityLevel( 6 ) ; break ; + case 3: item->setPriorityLevel( 7 ) ; break ; + case 4: item->setPriorityLevel( 8 ) ; break ; + } + + qos.in_rsItem(item) ; + } + + // Now perform stats on outputs for the 10000 first elements + + std::vector hist(nb_levels,0) ; + + for(uint32_t i=0;i<10000;++i) + { + RsItem *item = qos.out_rsItem() ; + hist[item->priority_level()]++ ; + delete item ; + } + + std::cerr << "Histogram: " ; + for(uint32_t i=0;isetPriorityLevel(i%nb_levels) ; + qos.in_rsItem(item) ; + } + + // pop a random number of packets out + + std::vector last_ids(nb_levels,0) ; + + for(uint32_t i=0;i(qos.out_rsItem()) ; + + if(item == NULL) + { + std::cerr << "Null output !" << std::endl; + break ; + } + + CHECK(last_ids[item->priority_level()] < item->_id) ; + + last_ids[item->priority_level()] = item->_id ; + delete item ; + } + + // print some info + static time_t last = 0 ; + if(now > last) + { + qos.print() ; + last = now ; + } + } + } + + FINALREPORT("pqiqos_test"); + return TESTRESULT() ; +} + + diff --git a/libretroshare/src/turtle/p3turtle.cc b/libretroshare/src/turtle/p3turtle.cc index 9de314eb8..db2937336 100644 --- a/libretroshare/src/turtle/p3turtle.cc +++ b/libretroshare/src/turtle/p3turtle.cc @@ -870,6 +870,8 @@ void p3turtle::handleSearchResult(RsTurtleSearchResultItem *item) // Routing of turtle tunnel items in a generic manner. Most tunnel packets will use this function, except packets designed for // contructing the tunnels and searching, namely TurtleSearchRequests/Results and OpenTunnel/TunnelOkItems // +// Only packets coming from handleIncoming() end up here, so this function is able to catch the transiting traffic. +// void p3turtle::routeGenericTunnelItem(RsTurtleGenericTunnelItem *item) { #ifdef P3TURTLE_DEBUG @@ -913,6 +915,9 @@ void p3turtle::routeGenericTunnelItem(RsTurtleGenericTunnelItem *item) _traffic_info_buffer.unknown_updn_Bps += static_cast(item)->serial_size() ; + if(dynamic_cast(item) != NULL) + item->setPriorityLevel(QOS_PRIORITY_RS_TURTLE_FORWARD_FILE_DATA) ; + sendItem(item) ; return ; } diff --git a/libretroshare/src/turtle/rsturtleitem.cc b/libretroshare/src/turtle/rsturtleitem.cc index c3f078df8..a4a931bc2 100644 --- a/libretroshare/src/turtle/rsturtleitem.cc +++ b/libretroshare/src/turtle/rsturtleitem.cc @@ -557,6 +557,7 @@ bool RsTurtleSearchResultItem::serialize(void *data,uint32_t& pktsize) RsTurtleFileMapItem::RsTurtleFileMapItem(void *data,uint32_t pktsize) : RsTurtleGenericTunnelItem(RS_TURTLE_SUBTYPE_FILE_MAP) { + setPriorityLevel(QOS_PRIORITY_RS_TURTLE_FILE_MAP) ; #ifdef P3TURTLE_DEBUG std::cerr << " type = file map item" << std::endl ; #endif @@ -588,6 +589,7 @@ RsTurtleFileMapItem::RsTurtleFileMapItem(void *data,uint32_t pktsize) RsTurtleFileMapRequestItem::RsTurtleFileMapRequestItem(void *data,uint32_t pktsize) : RsTurtleGenericTunnelItem(RS_TURTLE_SUBTYPE_FILE_MAP_REQUEST) { + setPriorityLevel(QOS_PRIORITY_RS_TURTLE_FILE_MAP_REQUEST) ; #ifdef P3TURTLE_DEBUG std::cerr << " type = file map request item" << std::endl ; #endif @@ -611,6 +613,7 @@ RsTurtleFileMapRequestItem::RsTurtleFileMapRequestItem(void *data,uint32_t pktsi RsTurtleFileCrcItem::RsTurtleFileCrcItem(void *data,uint32_t pktsize) : RsTurtleGenericTunnelItem(RS_TURTLE_SUBTYPE_FILE_CRC) { + setPriorityLevel(QOS_PRIORITY_RS_TURTLE_FILE_CRC) ; #ifdef P3TURTLE_DEBUG std::cerr << " type = file map item" << std::endl ; #endif @@ -645,6 +648,7 @@ RsTurtleFileCrcItem::RsTurtleFileCrcItem(void *data,uint32_t pktsize) RsTurtleFileCrcRequestItem::RsTurtleFileCrcRequestItem(void *data,uint32_t pktsize) : RsTurtleGenericTunnelItem(RS_TURTLE_SUBTYPE_FILE_CRC_REQUEST) { + setPriorityLevel(QOS_PRIORITY_RS_TURTLE_FILE_CRC_REQUEST) ; #ifdef P3TURTLE_DEBUG std::cerr << " type = file map request item" << std::endl ; #endif @@ -667,6 +671,7 @@ RsTurtleFileCrcRequestItem::RsTurtleFileCrcRequestItem(void *data,uint32_t pktsi RsTurtleSearchResultItem::RsTurtleSearchResultItem(void *data,uint32_t pktsize) : RsTurtleItem(RS_TURTLE_SUBTYPE_SEARCH_RESULT) { + setPriorityLevel(QOS_PRIORITY_RS_TURTLE_SEARCH_RESULT) ; #ifdef P3TURTLE_DEBUG std::cerr << " type = search result" << std::endl ; #endif @@ -745,6 +750,7 @@ bool RsTurtleOpenTunnelItem::serialize(void *data,uint32_t& pktsize) RsTurtleOpenTunnelItem::RsTurtleOpenTunnelItem(void *data,uint32_t pktsize) : RsTurtleItem(RS_TURTLE_SUBTYPE_OPEN_TUNNEL) { + setPriorityLevel(QOS_PRIORITY_RS_TURTLE_OPEN_TUNNEL) ; #ifdef P3TURTLE_DEBUG std::cerr << " type = open tunnel" << std::endl ; #endif @@ -808,6 +814,7 @@ bool RsTurtleTunnelOkItem::serialize(void *data,uint32_t& pktsize) RsTurtleTunnelOkItem::RsTurtleTunnelOkItem(void *data,uint32_t pktsize) : RsTurtleItem(RS_TURTLE_SUBTYPE_TUNNEL_OK) { + setPriorityLevel(QOS_PRIORITY_RS_TURTLE_TUNNEL_OK) ; #ifdef P3TURTLE_DEBUG std::cerr << " type = tunnel ok" << std::endl ; #endif @@ -870,6 +877,7 @@ bool RsTurtleFileRequestItem::serialize(void *data,uint32_t& pktsize) RsTurtleFileRequestItem::RsTurtleFileRequestItem(void *data,uint32_t pktsize) : RsTurtleGenericTunnelItem(RS_TURTLE_SUBTYPE_FILE_REQUEST) { + setPriorityLevel(QOS_PRIORITY_RS_TURTLE_FILE_REQUEST) ; #ifdef P3TURTLE_DEBUG std::cerr << " type = file request" << std::endl ; #endif @@ -903,6 +911,7 @@ RsTurtleFileDataItem::~RsTurtleFileDataItem() RsTurtleFileDataItem::RsTurtleFileDataItem(void *data,uint32_t pktsize) : RsTurtleGenericTunnelItem(RS_TURTLE_SUBTYPE_FILE_DATA) { + setPriorityLevel(QOS_PRIORITY_RS_TURTLE_FILE_DATA) ; #ifdef P3TURTLE_DEBUG std::cerr << " type = file request" << std::endl ; #endif diff --git a/libretroshare/src/turtle/rsturtleitem.h b/libretroshare/src/turtle/rsturtleitem.h index 4232e71de..ab8cb76c7 100644 --- a/libretroshare/src/turtle/rsturtleitem.h +++ b/libretroshare/src/turtle/rsturtleitem.h @@ -46,7 +46,7 @@ class RsTurtleItem: public RsItem class RsTurtleSearchResultItem: public RsTurtleItem { public: - RsTurtleSearchResultItem() : RsTurtleItem(RS_TURTLE_SUBTYPE_SEARCH_RESULT) {} + RsTurtleSearchResultItem() : RsTurtleItem(RS_TURTLE_SUBTYPE_SEARCH_RESULT) { setPriorityLevel(QOS_PRIORITY_RS_TURTLE_SEARCH_RESULT) ;} RsTurtleSearchResultItem(void *data,uint32_t size) ; // deserialization TurtleSearchRequestId request_id ; // Randomly generated request id. @@ -67,7 +67,7 @@ class RsTurtleSearchResultItem: public RsTurtleItem class RsTurtleSearchRequestItem: public RsTurtleItem { public: - RsTurtleSearchRequestItem(uint32_t subtype) : RsTurtleItem(subtype) {} + RsTurtleSearchRequestItem(uint32_t subtype) : RsTurtleItem(subtype) { setPriorityLevel(QOS_PRIORITY_RS_TURTLE_SEARCH_REQUEST) ;} virtual RsTurtleSearchRequestItem *clone() const = 0 ; // used for cloning in routing methods virtual void performLocalSearch(std::list&) const = 0 ; // abstracts the search method @@ -117,7 +117,7 @@ class RsTurtleRegExpSearchRequestItem: public RsTurtleSearchRequestItem class RsTurtleOpenTunnelItem: public RsTurtleItem { public: - RsTurtleOpenTunnelItem() : RsTurtleItem(RS_TURTLE_SUBTYPE_OPEN_TUNNEL) {} + RsTurtleOpenTunnelItem() : RsTurtleItem(RS_TURTLE_SUBTYPE_OPEN_TUNNEL) { setPriorityLevel(QOS_PRIORITY_RS_TURTLE_OPEN_TUNNEL) ;} RsTurtleOpenTunnelItem(void *data,uint32_t size) ; // deserialization TurtleFileHash file_hash ; // hash to match @@ -135,7 +135,7 @@ class RsTurtleOpenTunnelItem: public RsTurtleItem class RsTurtleTunnelOkItem: public RsTurtleItem { public: - RsTurtleTunnelOkItem() : RsTurtleItem(RS_TURTLE_SUBTYPE_TUNNEL_OK) {} + RsTurtleTunnelOkItem() : RsTurtleItem(RS_TURTLE_SUBTYPE_TUNNEL_OK) { setPriorityLevel(QOS_PRIORITY_RS_TURTLE_TUNNEL_OK) ;} RsTurtleTunnelOkItem(void *data,uint32_t size) ; // deserialization uint32_t tunnel_id ; // id of the tunnel. Should be identical for a tunnel between two same peers for the same hash. @@ -155,7 +155,7 @@ class RsTurtleTunnelOkItem: public RsTurtleItem class RsTurtleGenericTunnelItem: public RsTurtleItem { public: - RsTurtleGenericTunnelItem(uint8_t sub_packet_id) : RsTurtleItem(sub_packet_id) {} + RsTurtleGenericTunnelItem(uint8_t sub_packet_id) : RsTurtleItem(sub_packet_id) { setPriorityLevel(QOS_PRIORITY_RS_TURTLE_GENERIC_ITEM);} typedef uint32_t Direction ; static const Direction DIRECTION_CLIENT = 0x001 ; @@ -176,9 +176,6 @@ class RsTurtleGenericTunnelItem: public RsTurtleItem /// requests are server packets, whereas file data are client packets. virtual Direction travelingDirection() const = 0 ; - - /// Generic tunnel items (such as file data) are added into the data queue - virtual RsItem::QueueType queueType() const { return RsItem::DATA_QUEUE ; } }; /***********************************************************************************/ @@ -188,7 +185,7 @@ class RsTurtleGenericTunnelItem: public RsTurtleItem class RsTurtleFileRequestItem: public RsTurtleGenericTunnelItem { public: - RsTurtleFileRequestItem() : RsTurtleGenericTunnelItem(RS_TURTLE_SUBTYPE_FILE_REQUEST) {} + RsTurtleFileRequestItem() : RsTurtleGenericTunnelItem(RS_TURTLE_SUBTYPE_FILE_REQUEST) { setPriorityLevel(QOS_PRIORITY_RS_TURTLE_FILE_REQUEST);} RsTurtleFileRequestItem(void *data,uint32_t size) ; // deserialization virtual bool shouldStampTunnel() const { return false ; } @@ -208,7 +205,7 @@ class RsTurtleFileRequestItem: public RsTurtleGenericTunnelItem class RsTurtleFileDataItem: public RsTurtleGenericTunnelItem { public: - RsTurtleFileDataItem() : RsTurtleGenericTunnelItem(RS_TURTLE_SUBTYPE_FILE_DATA) {} + RsTurtleFileDataItem() : RsTurtleGenericTunnelItem(RS_TURTLE_SUBTYPE_FILE_DATA) { setPriorityLevel(QOS_PRIORITY_RS_TURTLE_FILE_DATA) ;} ~RsTurtleFileDataItem() ; RsTurtleFileDataItem(void *data,uint32_t size) ; // deserialization @@ -230,7 +227,7 @@ class RsTurtleFileDataItem: public RsTurtleGenericTunnelItem class RsTurtleFileMapRequestItem: public RsTurtleGenericTunnelItem { public: - RsTurtleFileMapRequestItem() : RsTurtleGenericTunnelItem(RS_TURTLE_SUBTYPE_FILE_MAP_REQUEST) {} + RsTurtleFileMapRequestItem() : RsTurtleGenericTunnelItem(RS_TURTLE_SUBTYPE_FILE_MAP_REQUEST) { setPriorityLevel(QOS_PRIORITY_RS_TURTLE_FILE_MAP_REQUEST) ;} RsTurtleFileMapRequestItem(void *data,uint32_t size) ; // deserialization virtual bool shouldStampTunnel() const { return false ; } @@ -250,7 +247,7 @@ class RsTurtleFileMapRequestItem: public RsTurtleGenericTunnelItem class RsTurtleFileMapItem: public RsTurtleGenericTunnelItem { public: - RsTurtleFileMapItem() : RsTurtleGenericTunnelItem(RS_TURTLE_SUBTYPE_FILE_MAP) {} + RsTurtleFileMapItem() : RsTurtleGenericTunnelItem(RS_TURTLE_SUBTYPE_FILE_MAP) { setPriorityLevel(QOS_PRIORITY_RS_TURTLE_FILE_MAP) ;} RsTurtleFileMapItem(void *data,uint32_t size) ; // deserialization virtual bool shouldStampTunnel() const { return false ; } @@ -274,7 +271,7 @@ class RsTurtleFileMapItem: public RsTurtleGenericTunnelItem class RsTurtleFileCrcRequestItem: public RsTurtleGenericTunnelItem { public: - RsTurtleFileCrcRequestItem() : RsTurtleGenericTunnelItem(RS_TURTLE_SUBTYPE_FILE_CRC_REQUEST) {} + RsTurtleFileCrcRequestItem() : RsTurtleGenericTunnelItem(RS_TURTLE_SUBTYPE_FILE_CRC_REQUEST) { setPriorityLevel(QOS_PRIORITY_RS_FILE_CRC_REQUEST);} RsTurtleFileCrcRequestItem(void *data,uint32_t size) ; // deserialization virtual bool shouldStampTunnel() const { return false ; } @@ -296,7 +293,7 @@ class RsTurtleFileCrcRequestItem: public RsTurtleGenericTunnelItem class RsTurtleFileCrcItem: public RsTurtleGenericTunnelItem { public: - RsTurtleFileCrcItem() : RsTurtleGenericTunnelItem(RS_TURTLE_SUBTYPE_FILE_CRC) {} + RsTurtleFileCrcItem() : RsTurtleGenericTunnelItem(RS_TURTLE_SUBTYPE_FILE_CRC) { setPriorityLevel(QOS_PRIORITY_RS_FILE_CRC);} RsTurtleFileCrcItem(void *data,uint32_t size) ; // deserialization virtual bool shouldStampTunnel() const { return true ; }