added basic functions to collect bandwidth info in pqistreamer both ways; added a sorting method in BWGraphSource to create curves from extracted BW info. Still not yet functional

git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@8600 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
csoler 2015-07-12 04:04:18 +00:00
parent 090148cea2
commit 9471a91795
23 changed files with 551 additions and 94 deletions

View file

@ -37,6 +37,8 @@
#include "pqi/pqinetwork.h"
class RSTrafficClue ;
/*** Base DataTypes: ****/
#include "serialiser/rsserial.h"
#include "retroshare/rstypes.h"
@ -93,6 +95,8 @@ virtual void getRates(RsBwRates &rates)
}
virtual int gatherOutQueueStatistics(std::vector<uint32_t>& /*per_service_count*/,std::vector<uint32_t>& /*per_priority_count*/) { return 0;}
virtual int gatherStatistics(std::list<RSTrafficClue>& /* outqueue_lst */,std::list<RSTrafficClue>& /* inqueue_lst */) { return 0;}
virtual int getQueueSize(bool /* in */) { return 0;}
virtual float getRate(bool in)
{

View file

@ -407,6 +407,17 @@ int pqihandler::ExtractOutQueueStatistics(OutQueueStatistics& stats)
return 1 ;
}
int pqihandler::ExtractTrafficInfo(std::list<RSTrafficClue>& out_lst,std::list<RSTrafficClue>& in_lst)
{
in_lst.clear() ;
out_lst.clear() ;
for( std::map<RsPeerId, SearchModule *>::iterator it = mods.begin(); it != mods.end(); ++it)
(it -> second)->pqi->gatherStatistics(out_lst,in_lst) ;
return 1 ;
}
// NEW extern fn to extract rates.
int pqihandler::ExtractRates(std::map<RsPeerId, RsBwRates> &ratemap, RsBwRates &total)
{

View file

@ -79,8 +79,9 @@ class pqihandler: public P3Interface, public pqiPublisher
// TESTING INTERFACE.
int ExtractRates(std::map<RsPeerId, RsBwRates> &ratemap, RsBwRates &totals);
int ExtractOutQueueStatistics(OutQueueStatistics& stats) ;
int ExtractTrafficInfo(std::list<RSTrafficClue> &out_lst, std::list<RSTrafficClue> &in_lst);
protected:
protected:
/* check to be overloaded by those that can
* generates warnings otherwise
*/

View file

@ -601,6 +601,17 @@ int pqiperson::gatherOutQueueStatistics(std::vector<uint32_t>& per_service,s
return 0;
return activepqi -> gatherOutQueueStatistics(per_service,per_priority);
}
int pqiperson::gatherStatistics(std::list<RSTrafficClue>& out_lst,std::list<RSTrafficClue>& in_lst)
{
RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/
// get the rate from the active one.
if ((!active) || (activepqi == NULL))
return 0;
return activepqi -> gatherStatistics(out_lst,in_lst);
}
int pqiperson::getQueueSize(bool in)
{
RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/

View file

@ -171,6 +171,7 @@ virtual float getRate(bool in);
virtual void setMaxRate(bool in, float val);
virtual void setRateCap(float val_in, float val_out);
virtual int gatherOutQueueStatistics(std::vector<uint32_t>& per_service,std::vector<uint32_t>& per_priority);
virtual int gatherStatistics(std::list<RSTrafficClue>& outqueue_lst,std::list<RSTrafficClue>& inqueue_lst) ;

View file

@ -56,28 +56,28 @@ void pqiQoS::in_rsItem(void *ptr,int priority)
++_nb_items ;
}
int pqiQoS::gatherStatistics(std::vector<uint32_t>& per_service_count,std::vector<uint32_t>& per_priority_count) const
{
assert(per_priority_count.size() == 10) ;
assert(per_service_count.size() == 65536) ;
for(uint32_t i=0;i<_item_queues.size();++i)
{
per_priority_count[i] += _item_queues[i].size() ;
for(std::list<void*>::const_iterator it(_item_queues[i]._items.begin());it!=_item_queues[i]._items.end();++it)
{
uint32_t type = 0;
uint32_t offset = 0;
getRawUInt32((uint8_t*)(*it), 4, &offset, &type);
uint16_t service_id = (type >> 8) & 0xffff ;
++per_service_count[service_id] ;
}
}
return 1 ;
}
// int pqiQoS::gatherStatistics(std::vector<uint32_t>& per_service_count,std::vector<uint32_t>& per_priority_count) const
// {
// assert(per_priority_count.size() == 10) ;
// assert(per_service_count.size() == 65536) ;
//
// for(uint32_t i=0;i<_item_queues.size();++i)
// {
// per_priority_count[i] += _item_queues[i].size() ;
//
// for(std::list<void*>::const_iterator it(_item_queues[i]._items.begin());it!=_item_queues[i]._items.end();++it)
// {
// uint32_t type = 0;
// uint32_t offset = 0;
// getRawUInt32((uint8_t*)(*it), 4, &offset, &type);
//
// uint16_t service_id = (type >> 8) & 0xffff ;
//
// ++per_service_count[service_id] ;
// }
// }
// return 1 ;
// }
void *pqiQoS::out_rsItem()

View file

@ -95,7 +95,7 @@ class pqiQoS
// get some stats about what's going on. service_packets will contain the number of
// packets per service, and queue_sizes will contain the size of the different priority queues.
int gatherStatistics(std::vector<uint32_t>& per_service_count,std::vector<uint32_t>& per_priority_count) const ;
//int gatherStatistics(std::vector<uint32_t>& per_service_count,std::vector<uint32_t>& per_priority_count) const ;
void computeTotalItemSize() const ;
int debug_computeTotalItemSize() const ;

View file

@ -43,10 +43,10 @@ int pqiQoSstreamer::getQueueSize(bool in)
}
}
int pqiQoSstreamer::locked_gatherStatistics(std::vector<uint32_t>& per_service_count,std::vector<uint32_t>& per_priority_count) const // extracting data.
{
return pqiQoS::gatherStatistics(per_service_count,per_priority_count) ;
}
//int pqiQoSstreamer::locked_gatherStatistics(std::vector<uint32_t>& per_service_count,std::vector<uint32_t>& per_priority_count) const // extracting data.
//{
// return pqiQoS::gatherStatistics(per_service_count,per_priority_count) ;
//}
void pqiQoSstreamer::locked_storeInOutputQueue(void *ptr,int priority)
{

View file

@ -41,7 +41,7 @@ class pqiQoSstreamer: public pqithreadstreamer, public pqiQoS
virtual void locked_clear_out_queue() ;
virtual int locked_compute_out_pkt_size() const { return _total_item_size ; }
virtual void *locked_pop_out_data() ;
virtual int locked_gatherStatistics(std::vector<uint32_t>& per_service_count,std::vector<uint32_t>& per_priority_count) const; // extracting data.
//virtual int locked_gatherStatistics(std::vector<uint32_t>& per_service_count,std::vector<uint32_t>& per_priority_count) const; // extracting data.
virtual int getQueueSize(bool in) ;

View file

@ -318,6 +318,15 @@ int pqistreamer::queue_outpqi_locked(RsItem *pqi,uint32_t& pktsize)
#ifdef DEBUG_PQISTREAMER
std::cerr << "pqistreamer::queue_outpqi() serializing packet with packet size : " << pktsize << std::endl;
#endif
/*******************************************************************************************/
// keep info for stats for a while. Only keep the items for the last two seconds. sec n is ongoing and second n-1
// is a full statistics chunk that can be used in the GUI
locked_addTrafficClue(pqi,pktsize,mCurrentStatsChunk_Out) ;
/*******************************************************************************************/
if (mRsSerialiser->serialise(pqi, ptr, &pktsize))
{
locked_storeInOutputQueue(ptr,pqi->priority_level()) ;
@ -345,7 +354,7 @@ int pqistreamer::queue_outpqi_locked(RsItem *pqi,uint32_t& pktsize)
return 1; // keep error internal.
}
int pqistreamer::handleincomingitem_locked(RsItem *pqi)
int pqistreamer::handleincomingitem_locked(RsItem *pqi,int len)
{
#ifdef DEBUG_PQISTREAMER
@ -358,9 +367,44 @@ int pqistreamer::handleincomingitem_locked(RsItem *pqi)
pqi -> PeerId(PeerId());
mIncoming.push_back(pqi);
++mIncomingSize ;
/*******************************************************************************************/
// keep info for stats for a while. Only keep the items for the last two seconds. sec n is ongoing and second n-1
// is a full statistics chunk that can be used in the GUI
locked_addTrafficClue(pqi,len,mCurrentStatsChunk_In) ;
/*******************************************************************************************/
return 1;
}
void pqistreamer::locked_addTrafficClue(const RsItem *pqi,uint32_t pktsize,std::list<RSTrafficClue>& lst)
{
time_t now = time(NULL) ;
if(now > mStatisticsTimeStamp) // new chunk => get rid of oldest, replace old list by current list, clear current list.
{
mPreviousStatsChunk_Out = mCurrentStatsChunk_Out ;
mPreviousStatsChunk_In = mCurrentStatsChunk_In ;
mCurrentStatsChunk_Out.clear() ;
mCurrentStatsChunk_In.clear() ;
mStatisticsTimeStamp = now ;
}
RSTrafficClue tc ;
tc.TS = now ;
tc.size = pktsize ;
tc.priority = pqi->priority_level() ;
tc.peer_id = pqi->PeerId() ;
tc.count = 1 ;
tc.service_id = pqi->PacketService() ;
tc.service_sub_id = pqi->PacketSubType() ;
lst.push_back(tc) ;
}
time_t pqistreamer::getLastIncomingTS()
{
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
@ -742,16 +786,9 @@ continue_packet:
std::cerr << "[" << (void*)pthread_self() << "] " << "deserializing. Size=" << pktlen << std::endl ;
#endif
// if(pktlen == 17306)
// {
// FILE *f = RsDirUtil::rs_fopen("dbug.packet.bin","w");
// fwrite(block,pktlen,1,f) ;
// fclose(f) ;
// exit(-1) ;
// }
RsItem *pkt = mRsSerialiser->deserialise(block, &pktlen);
if ((pkt != NULL) && (0 < handleincomingitem_locked(pkt)))
if ((pkt != NULL) && (0 < handleincomingitem_locked(pkt,pktlen)))
{
#ifdef DEBUG_PQISTREAMER
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, "Successfully Read a Packet!");
@ -988,11 +1025,19 @@ void pqistreamer::free_rpend_locked()
int pqistreamer::gatherOutQueueStatistics(std::vector<uint32_t>& per_service_count,std::vector<uint32_t>& per_priority_count)
{
#ifdef TO_REMOVE
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
return locked_gatherStatistics(per_service_count,per_priority_count);
#endif
}
int pqistreamer::gatherStatistics(std::list<RSTrafficClue>& outqueue_lst,std::list<RSTrafficClue>& inqueue_lst)
{
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
return locked_gatherStatistics(outqueue_lst,inqueue_lst);
}
int pqistreamer::getQueueSize(bool in)
{
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
@ -1046,9 +1091,9 @@ int pqistreamer::locked_compute_out_pkt_size() const
return total ;
}
int pqistreamer::locked_gatherStatistics(std::vector<uint32_t>&,std::vector<uint32_t>&) const
int pqistreamer::locked_gatherStatistics(std::list<RSTrafficClue>& out_lst,std::list<RSTrafficClue>& in_lst)
{
std::cerr << "(II) called overloaded function locked_gatherStatistics(). This is probably an error" << std::endl;
// std::cerr << "(II) called overloaded function pqistreamer::locked_gatherStatistics(). " << std::endl;
return 1 ;
}

View file

@ -62,7 +62,8 @@ class pqistreamer: public PQInterface
time_t getLastIncomingTS(); // Time of last data packet, for checking a connection is alive.
virtual void getRates(RsBwRates &rates);
virtual int getQueueSize(bool in); // extracting data.
virtual int gatherOutQueueStatistics(std::vector<uint32_t>& per_service_count,std::vector<uint32_t>& per_priority_count); // extracting data.
virtual int gatherOutQueueStatistics(std::vector<uint32_t>& per_service_count,std::vector<uint32_t>& per_priority_count); // extracting data.
virtual int gatherStatistics(std::list<RSTrafficClue>& outqueue_stats,std::list<RSTrafficClue>& inqueue_stats); // extracting data.
protected:
int tick_bio();
@ -78,7 +79,8 @@ class pqistreamer: public PQInterface
virtual void locked_clear_out_queue() ;
virtual int locked_compute_out_pkt_size() const ;
virtual void *locked_pop_out_data() ;
virtual int locked_gatherStatistics(std::vector<uint32_t>& per_service_count,std::vector<uint32_t>& per_priority_count) const; // extracting data.
//virtual int locked_gatherStatistics(std::vector<uint32_t>& per_service_count,std::vector<uint32_t>& per_priority_count) const; // extracting data.
virtual int locked_gatherStatistics(std::list<RSTrafficClue>& outqueue_stats,std::list<RSTrafficClue>& inqueue_stats); // extracting data.
protected:
@ -90,7 +92,7 @@ class pqistreamer: public PQInterface
private:
int queue_outpqi_locked(RsItem *i,uint32_t& serialized_size);
int handleincomingitem_locked(RsItem *i);
int handleincomingitem_locked(RsItem *i, int len);
// ticked regularly (manages out queues and sending
// via above interfaces.
@ -146,7 +148,15 @@ class pqistreamer: public PQInterface
time_t mLastIncomingTs;
// traffic statistics
std::list<RSTrafficClue> mPreviousStatsChunk_In ;
std::list<RSTrafficClue> mPreviousStatsChunk_Out ;
std::list<RSTrafficClue> mCurrentStatsChunk_In ;
std::list<RSTrafficClue> mCurrentStatsChunk_Out ;
time_t mStatisticsTimeStamp ;
void locked_addTrafficClue(const RsItem *pqi, uint32_t pktsize, std::list<RSTrafficClue> &lst);
};
#endif //MRK_PQI_STREAMER_HEADER

View file

@ -171,6 +171,21 @@ class RsConfigDataRates
int mQueueOut;
};
class RSTrafficClue
{
public:
time_t TS ;
uint32_t size ;
uint8_t priority ;
uint16_t service_id ;
uint8_t service_sub_id ;
RsPeerId peer_id ;
uint32_t count ;
RSTrafficClue() { TS=0;size=0;service_id=0;service_sub_id=0; count=0; }
RSTrafficClue& operator+=(const RSTrafficClue& tc) { size += tc.size; count += tc.count ; return *this ;}
};
class OutQueueStatistics
{
public:
@ -232,59 +247,61 @@ class RsConfigNetStatus
class RsServerConfig
{
public:
public:
RsServerConfig() { return; }
virtual ~RsServerConfig() { return; }
RsServerConfig() {}
virtual ~RsServerConfig() {}
/* From RsIface::RsConfig */
// Implemented Only this one!
virtual int getConfigNetStatus(RsConfigNetStatus &status) = 0;
/* From RsIface::RsConfig */
// Implemented Only this one!
virtual int getConfigNetStatus(RsConfigNetStatus &status) = 0;
// NOT IMPLEMENTED YET!
//virtual int getConfigStartup(RsConfigStartup &params) = 0;
// NOT IMPLEMENTED YET!
//virtual int getConfigStartup(RsConfigStartup &params) = 0;
virtual int getTotalBandwidthRates(RsConfigDataRates &rates) = 0;
virtual int getAllBandwidthRates(std::map<RsPeerId, RsConfigDataRates> &ratemap) = 0;
virtual int getOutQueueStatistics(OutQueueStatistics& stats) = 0 ;
/* From RsInit */
virtual int getTotalBandwidthRates(RsConfigDataRates &rates) = 0;
virtual int getAllBandwidthRates(std::map<RsPeerId, RsConfigDataRates> &ratemap) = 0;
virtual int getOutQueueStatistics(OutQueueStatistics& stats) = 0 ;
virtual int getTrafficInfo(std::list<RSTrafficClue>& out_lst,std::list<RSTrafficClue>& in_lst) = 0 ;
// NOT IMPLEMENTED YET!
//virtual std::string RsConfigDirectory() = 0;
//virtual std::string RsConfigKeysDirectory() = 0;
/* From RsInit */
//virtual std::string RsProfileConfigDirectory() = 0;
//virtual bool getStartMinimised() = 0;
//virtual std::string getRetroShareLink() = 0;
// NOT IMPLEMENTED YET!
//virtual std::string RsConfigDirectory() = 0;
//virtual std::string RsConfigKeysDirectory() = 0;
//virtual bool getAutoLogin() = 0;
//virtual void setAutoLogin(bool autoLogin) = 0;
//virtual bool RsClearAutoLogin() = 0;
//virtual std::string RsProfileConfigDirectory() = 0;
//virtual bool getStartMinimised() = 0;
//virtual std::string getRetroShareLink() = 0;
//virtual std::string getRetroshareDataDirectory() = 0;
//virtual bool getAutoLogin() = 0;
//virtual void setAutoLogin(bool autoLogin) = 0;
//virtual bool RsClearAutoLogin() = 0;
/* New Stuff */
//virtual std::string getRetroshareDataDirectory() = 0;
virtual uint32_t getUserLevel() = 0;
/* New Stuff */
virtual uint32_t getNetState() = 0;
virtual uint32_t getNetworkMode() = 0;
virtual uint32_t getNatTypeMode() = 0;
virtual uint32_t getNatHoleMode() = 0;
virtual uint32_t getConnectModes() = 0;
virtual uint32_t getUserLevel() = 0;
virtual bool getConfigurationOption(uint32_t key, std::string &opt) = 0;
virtual bool setConfigurationOption(uint32_t key, const std::string &opt) = 0;
virtual uint32_t getNetState() = 0;
virtual uint32_t getNetworkMode() = 0;
virtual uint32_t getNatTypeMode() = 0;
virtual uint32_t getNatHoleMode() = 0;
virtual uint32_t getConnectModes() = 0;
virtual bool getConfigurationOption(uint32_t key, std::string &opt) = 0;
virtual bool setConfigurationOption(uint32_t key, const std::string &opt) = 0;
/* Operating Mode */
virtual uint32_t getOperatingMode() = 0;
virtual bool setOperatingMode(uint32_t opMode) = 0;
/* Operating Mode */
virtual uint32_t getOperatingMode() = 0;
virtual bool setOperatingMode(uint32_t opMode) = 0;
/* Data Rate Control - to be moved here */
virtual int SetMaxDataRates( int downKb, int upKb ) = 0;
virtual int GetMaxDataRates( int &inKb, int &outKb ) = 0;
virtual int GetCurrentDataRates( float &inKb, float &outKb ) = 0;
/* Data Rate Control - to be moved here */
virtual int SetMaxDataRates( int downKb, int upKb ) = 0;
virtual int GetMaxDataRates( int &inKb, int &outKb ) = 0;
virtual int GetCurrentDataRates( float &inKb, float &outKb ) = 0;
};

View file

@ -220,7 +220,6 @@ static const uint32_t RS_GENERIC_ID_GXS_CIRCLE_ID_TYPE = 0x0008 ;
static const uint32_t RS_GENERIC_ID_GROUTER_ID_TYPE = 0x0009 ;
typedef t_RsGenericIdType< SSL_ID_SIZE , false, RS_GENERIC_ID_SSL_ID_TYPE> SSLIdType ;
//typedef t_RsGenericIdType< SSL_ID_SIZE , false, RS_GENERIC_ID_GROUTER_ID_TYPE> GRouterKeyIdType ;
typedef t_RsGenericIdType< PGP_KEY_ID_SIZE , true, RS_GENERIC_ID_PGP_ID_TYPE> PGPIdType ;
typedef t_RsGenericIdType< SHA1_SIZE , false, RS_GENERIC_ID_SHA1_ID_TYPE> Sha1CheckSum ;
typedef t_RsGenericIdType< PGP_KEY_FINGERPRINT_SIZE, true, RS_GENERIC_ID_PGP_FINGERPRINT_TYPE> PGPFingerprintType ;

View file

@ -187,14 +187,16 @@ int p3ServerConfig::getConfigStartup(RsConfigStartup &/*params*/)
return 0;
}
#if 0
int p3ServerConfig::getConfigDataRates(RsConfigDataRates &/*params*/)
{
return 0;
}
#endif
/***** for RsConfig -> p3BandwidthControl ****/
/***** for RsConfig -> p3BandwidthControl ****/
int p3ServerConfig::getTrafficInfo(std::list<RSTrafficClue>& out_lst,std::list<RSTrafficClue>& in_lst)
{
if (rsBandwidthControl)
return rsBandwidthControl->ExtractTrafficInfo(out_lst,in_lst);
else
return 0 ;
}
int p3ServerConfig::getTotalBandwidthRates(RsConfigDataRates &rates)
{

View file

@ -59,6 +59,7 @@ virtual int getConfigStartup(RsConfigStartup &params);
virtual int getTotalBandwidthRates(RsConfigDataRates &rates);
virtual int getAllBandwidthRates(std::map<RsPeerId, RsConfigDataRates> &ratemap);
virtual int getOutQueueStatistics(OutQueueStatistics& stats) ;
virtual int getTrafficInfo(std::list<RSTrafficClue>& out_lst, std::list<RSTrafficClue> &in_lst) ;
/* From RsInit */

View file

@ -285,6 +285,11 @@ int p3BandwidthControl::ExtractOutQueueStatistics(OutQueueStatistics& stats)
return mPg->ExtractOutQueueStatistics(stats) ;
}
int p3BandwidthControl::ExtractTrafficInfo(std::list<RSTrafficClue>& in_stats,std::list<RSTrafficClue>& out_stats)
{
return mPg->ExtractTrafficInfo(out_stats,in_stats) ;
}

View file

@ -95,6 +95,7 @@ class p3BandwidthControl: public p3Service, public pqiServiceMonitor
virtual int ExtractOutQueueStatistics(OutQueueStatistics& stats) ;
virtual int ExtractTrafficInfo(std::list<RSTrafficClue> &in_stats, std::list<RSTrafficClue> &out_stats);
/*!
* Interface stuff.
@ -110,8 +111,7 @@ class p3BandwidthControl: public p3Service, public pqiServiceMonitor
//virtual void saveDone();
//virtual bool loadList(std::list<RsItem*>& load) ;
private:
private:
bool checkAvailableBandwidth();
bool processIncoming();