mirror of
https://github.com/RetroShare/RetroShare.git
synced 2025-05-05 07:35:12 -04:00
added very basic display of outqueue statistics in BwCtrlWindow. Removed unnecessary list::size() calls in pqistreamer
git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@7652 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
parent
ec2d6ed2c7
commit
ea5b923428
21 changed files with 345 additions and 48 deletions
|
@ -91,6 +91,7 @@ virtual void getRates(RsBwRates &rates)
|
|||
return;
|
||||
}
|
||||
|
||||
virtual int gatherOutQueueStatistics(std::vector<uint32_t>& per_service_count,std::vector<uint32_t>& per_priority_count) { return 0;}
|
||||
virtual int getQueueSize(bool /* in */) { return 0;}
|
||||
virtual float getRate(bool in)
|
||||
{
|
||||
|
|
|
@ -88,7 +88,27 @@ int pqihandler::tick()
|
|||
#endif
|
||||
moreToTick = 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// static time_t last_print_time = 0 ;
|
||||
// time_t now = time(NULL) ;
|
||||
// if(now > last_print_time + 3)
|
||||
// {
|
||||
// std::map<uint16_t,uint32_t> per_service_count ;
|
||||
// std::vector<uint32_t> per_priority_count ;
|
||||
//
|
||||
// ExtractOutQueueStatistics(per_service_count,per_priority_count) ;
|
||||
//
|
||||
// std::cerr << "PQIHandler outqueues: " << std::endl;
|
||||
//
|
||||
// for(std::map<uint16_t,uint32_t>::const_iterator it=per_service_count.begin();it!=per_service_count.end();++it)
|
||||
// std::cerr << " " << std::hex << it->first << std::dec << " " << it->second << std::endl;
|
||||
//
|
||||
// for(int i=0;i<per_priority_count.size();++i)
|
||||
// std::cerr << " " << i << " : " << per_priority_count[i] << std::endl;
|
||||
//
|
||||
// last_print_time = now ;
|
||||
// }
|
||||
|
||||
UpdateRates();
|
||||
return moreToTick;
|
||||
|
@ -326,9 +346,13 @@ int pqihandler::locked_GetItems()
|
|||
while((item = (mod -> pqi) -> GetItem()) != NULL)
|
||||
{
|
||||
|
||||
static int ntimes =0 ;
|
||||
if(++ntimes < 20)
|
||||
{
|
||||
std::cerr << "pqihandler::locked_GetItems() pqi->GetItem()";
|
||||
std::cerr << " should never happen anymore!";
|
||||
std::cerr << std::endl;
|
||||
}
|
||||
|
||||
#ifdef RSITEM_DEBUG
|
||||
std::string out;
|
||||
|
@ -427,6 +451,26 @@ RsRawItem *pqihandler::GetRsRawItem()
|
|||
|
||||
static const float MIN_RATE = 0.01; // 10 B/s
|
||||
|
||||
int pqihandler::ExtractOutQueueStatistics(OutQueueStatistics& stats)
|
||||
{
|
||||
stats.per_service_item_count.clear() ;
|
||||
|
||||
std::vector<uint32_t> item_counts(65536,0) ;
|
||||
stats.per_priority_item_count.clear() ;
|
||||
stats.per_priority_item_count.resize(10,0) ;
|
||||
|
||||
std::map<RsPeerId, SearchModule *>::iterator it;
|
||||
|
||||
for(it = mods.begin(); it != mods.end(); ++it)
|
||||
(it -> second)->pqi->gatherOutQueueStatistics(item_counts,stats.per_priority_item_count) ;
|
||||
|
||||
for(int i=0;i<65536;++i)
|
||||
if(item_counts[i] > 0)
|
||||
stats.per_service_item_count[i] = item_counts[i] ;
|
||||
|
||||
return 1 ;
|
||||
}
|
||||
|
||||
// NEW extern fn to extract rates.
|
||||
int pqihandler::ExtractRates(std::map<RsPeerId, RsBwRates> &ratemap, RsBwRates &total)
|
||||
{
|
||||
|
|
|
@ -32,6 +32,7 @@
|
|||
|
||||
#include "util/rsthreads.h"
|
||||
#include "retroshare/rstypes.h"
|
||||
#include "retroshare/rsconfig.h"
|
||||
|
||||
#include <map>
|
||||
#include <list>
|
||||
|
@ -77,6 +78,7 @@ class pqihandler: public P3Interface, public pqiPublisher
|
|||
|
||||
// TESTING INTERFACE.
|
||||
int ExtractRates(std::map<RsPeerId, RsBwRates> &ratemap, RsBwRates &totals);
|
||||
int ExtractOutQueueStatistics(OutQueueStatistics& stats) ;
|
||||
|
||||
protected:
|
||||
/* check to be overloaded by those that can
|
||||
|
|
|
@ -573,7 +573,15 @@ void pqiperson::getRates(RsBwRates &rates)
|
|||
return;
|
||||
activepqi -> getRates(rates);
|
||||
}
|
||||
int pqiperson::gatherOutQueueStatistics(std::vector<uint32_t>& per_service,std::vector<uint32_t>& per_priority)
|
||||
{
|
||||
RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/
|
||||
|
||||
// get the rate from the active one.
|
||||
if ((!active) || (activepqi == NULL))
|
||||
return 0;
|
||||
return activepqi -> gatherOutQueueStatistics(per_service,per_priority);
|
||||
}
|
||||
int pqiperson::getQueueSize(bool in)
|
||||
{
|
||||
RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/
|
||||
|
|
|
@ -170,6 +170,8 @@ virtual void getRates(RsBwRates &rates);
|
|||
virtual float getRate(bool in);
|
||||
virtual void setMaxRate(bool in, float val);
|
||||
virtual void setRateCap(float val_in, float val_out);
|
||||
virtual int gatherOutQueueStatistics(std::vector<uint32_t>& per_service,std::vector<uint32_t>& per_priority);
|
||||
|
||||
|
||||
|
||||
private:
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
#include <list>
|
||||
#include <math.h>
|
||||
#include <serialiser/rsserial.h>
|
||||
#include <serialiser/rsbaseserial.h>
|
||||
|
||||
#include "pqiqos.h"
|
||||
|
||||
|
@ -55,6 +56,30 @@ void pqiQoS::in_rsItem(void *ptr,int priority)
|
|||
++_nb_items ;
|
||||
}
|
||||
|
||||
int pqiQoS::gatherStatistics(std::vector<uint32_t>& per_service_count,std::vector<uint32_t>& per_priority_count) const
|
||||
{
|
||||
assert(per_priority_count.size() == 10) ;
|
||||
assert(per_service_count.size() == 65536) ;
|
||||
|
||||
for(uint32_t i=0;i<_item_queues.size();++i)
|
||||
{
|
||||
per_priority_count[i] += _item_queues[i].size() ;
|
||||
|
||||
for(std::list<void*>::const_iterator it(_item_queues[i]._items.begin());it!=_item_queues[i]._items.end();++it)
|
||||
{
|
||||
uint32_t type = 0;
|
||||
uint32_t offset = 0;
|
||||
getRawUInt32((uint8_t*)(*it), 4, &offset, &type);
|
||||
|
||||
uint16_t service_id = (type >> 8) & 0xffff ;
|
||||
|
||||
++per_service_count[service_id] ;
|
||||
}
|
||||
}
|
||||
return 1 ;
|
||||
}
|
||||
|
||||
|
||||
void *pqiQoS::out_rsItem()
|
||||
{
|
||||
// Go through the queues. Increment counters.
|
||||
|
|
|
@ -46,26 +46,35 @@ class pqiQoS
|
|||
|
||||
class ItemQueue
|
||||
{
|
||||
public:
|
||||
public:
|
||||
ItemQueue()
|
||||
{
|
||||
_item_count =0 ;
|
||||
}
|
||||
void *pop()
|
||||
{
|
||||
if(_items.empty())
|
||||
return NULL ;
|
||||
|
||||
void *item = _items.front() ;
|
||||
_items.pop_front() ;
|
||||
_items.pop_front() ;
|
||||
--_item_count ;
|
||||
|
||||
return item ;
|
||||
}
|
||||
|
||||
void push(void *item)
|
||||
{
|
||||
_items.push_back(item) ;
|
||||
}
|
||||
_items.push_back(item) ;
|
||||
++_item_count ;
|
||||
}
|
||||
|
||||
uint32_t size() const { return _item_count ; }
|
||||
|
||||
float _threshold ;
|
||||
float _counter ;
|
||||
float _inc ;
|
||||
float _inc ;
|
||||
uint32_t _item_count ;
|
||||
std::list<void*> _items ;
|
||||
};
|
||||
|
||||
|
@ -83,6 +92,11 @@ class pqiQoS
|
|||
// kills all waiting items.
|
||||
void clear() ;
|
||||
|
||||
// get some stats about what's going on. service_packets will contain the number of
|
||||
// packets per service, and queue_sizes will contain the size of the different priority queues.
|
||||
|
||||
int gatherStatistics(std::vector<uint32_t>& per_service_count,std::vector<uint32_t>& per_priority_count) const ;
|
||||
|
||||
void computeTotalItemSize() const ;
|
||||
int debug_computeTotalItemSize() const ;
|
||||
private:
|
||||
|
|
|
@ -43,6 +43,11 @@ int pqiQoSstreamer::getQueueSize(bool in)
|
|||
}
|
||||
}
|
||||
|
||||
int pqiQoSstreamer::locked_gatherStatistics(std::vector<uint32_t>& per_service_count,std::vector<uint32_t>& per_priority_count) const // extracting data.
|
||||
{
|
||||
return pqiQoS::gatherStatistics(per_service_count,per_priority_count) ;
|
||||
}
|
||||
|
||||
void pqiQoSstreamer::locked_storeInOutputQueue(void *ptr,int priority)
|
||||
{
|
||||
_total_item_size += getRsItemSize(ptr) ;
|
||||
|
|
|
@ -40,7 +40,9 @@ class pqiQoSstreamer: public pqithreadstreamer, public pqiQoS
|
|||
virtual int locked_out_queue_size() const { return _total_item_count ; }
|
||||
virtual void locked_clear_out_queue() ;
|
||||
virtual int locked_compute_out_pkt_size() const { return _total_item_size ; }
|
||||
virtual void *locked_pop_out_data() ;
|
||||
virtual void *locked_pop_out_data() ;
|
||||
virtual int locked_gatherStatistics(std::vector<uint32_t>& per_service_count,std::vector<uint32_t>& per_priority_count) const; // extracting data.
|
||||
|
||||
|
||||
virtual int getQueueSize(bool in) ;
|
||||
|
||||
|
|
|
@ -61,7 +61,8 @@ pqistreamer::pqistreamer(RsSerialiser *rss, const RsPeerId& id, BinInterface *bi
|
|||
{
|
||||
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
|
||||
|
||||
mAvgLastUpdate = mCurrReadTS = mCurrSentTS = time(NULL);
|
||||
mAvgLastUpdate = mCurrReadTS = mCurrSentTS = time(NULL);
|
||||
mIncomingSize = 0 ;
|
||||
|
||||
/* allocated once */
|
||||
mPkt_rpend_size = getRsPktMaxSize();
|
||||
|
@ -123,12 +124,16 @@ pqistreamer::~pqistreamer()
|
|||
free(mPkt_rpending);
|
||||
|
||||
// clean up incoming.
|
||||
while(mIncoming.size() > 0)
|
||||
while(!mIncoming.empty())
|
||||
{
|
||||
RsItem *i = mIncoming.front();
|
||||
mIncoming.pop_front();
|
||||
mIncoming.pop_front();
|
||||
--mIncomingSize ;
|
||||
delete i;
|
||||
}
|
||||
}
|
||||
|
||||
if(mIncomingSize != 0)
|
||||
std::cerr << "(EE) inconsistency after deleting pqistreamer queue. Remaining items: " << mIncomingSize << std::endl;
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -162,7 +167,8 @@ RsItem *pqistreamer::GetItem()
|
|||
return NULL;
|
||||
|
||||
RsItem *osr = mIncoming.front() ;
|
||||
mIncoming.pop_front() ;
|
||||
mIncoming.pop_front() ;
|
||||
--mIncomingSize;
|
||||
|
||||
return osr;
|
||||
}
|
||||
|
@ -211,7 +217,7 @@ int pqistreamer::tick()
|
|||
int total = locked_compute_out_pkt_size() ;
|
||||
|
||||
rs_sprintf_append(out, "\t Out Packets [%d] => %d bytes\n", locked_out_queue_size(), total);
|
||||
rs_sprintf_append(out, "\t Incoming [%d]\n", mIncoming.size());
|
||||
rs_sprintf_append(out, "\t Incoming [%d]\n", mIncomingSize);
|
||||
}
|
||||
|
||||
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out);
|
||||
|
@ -219,7 +225,7 @@ int pqistreamer::tick()
|
|||
#endif
|
||||
|
||||
/* if there is more stuff in the queues */
|
||||
if ((mIncoming.size() > 0) || (locked_out_queue_size() > 0))
|
||||
if ((!mIncoming.empty()) || (locked_out_queue_size() > 0))
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
|
@ -349,7 +355,8 @@ int pqistreamer::handleincomingitem_locked(RsItem *pqi)
|
|||
|
||||
// Use overloaded Contact function
|
||||
pqi -> PeerId(PeerId());
|
||||
mIncoming.push_back(pqi);
|
||||
mIncoming.push_back(pqi);
|
||||
++mIncomingSize ;
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
@ -953,13 +960,21 @@ void pqistreamer::inReadBytes_locked(int inb)
|
|||
return;
|
||||
}
|
||||
|
||||
int pqistreamer::gatherOutQueueStatistics(std::vector<uint32_t>& per_service_count,std::vector<uint32_t>& per_priority_count)
|
||||
{
|
||||
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
|
||||
|
||||
return locked_gatherStatistics(per_service_count,per_priority_count);
|
||||
}
|
||||
|
||||
int pqistreamer::getQueueSize(bool in)
|
||||
{
|
||||
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
|
||||
|
||||
if (in)
|
||||
return mIncoming.size();
|
||||
return locked_out_queue_size();
|
||||
return mIncomingSize;
|
||||
else
|
||||
return locked_out_queue_size();
|
||||
}
|
||||
|
||||
void pqistreamer::getRates(RsBwRates &rates)
|
||||
|
@ -968,7 +983,7 @@ void pqistreamer::getRates(RsBwRates &rates)
|
|||
|
||||
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
|
||||
|
||||
rates.mQueueIn = mIncoming.size();
|
||||
rates.mQueueIn = mIncomingSize;
|
||||
rates.mQueueOut = locked_out_queue_size();
|
||||
}
|
||||
|
||||
|
@ -1005,6 +1020,12 @@ int pqistreamer::locked_compute_out_pkt_size() const
|
|||
return total ;
|
||||
}
|
||||
|
||||
int pqistreamer::locked_gatherStatistics(std::vector<uint32_t>&,std::vector<uint32_t>&) const
|
||||
{
|
||||
std::cerr << "(II) called overloaded function locked_gatherStatistics(). This is probably an error" << std::endl;
|
||||
return 1 ;
|
||||
}
|
||||
|
||||
void *pqistreamer::locked_pop_out_data()
|
||||
{
|
||||
void *res = NULL ;
|
||||
|
|
|
@ -62,7 +62,8 @@ class pqistreamer: public PQInterface
|
|||
time_t getLastIncomingTS(); // Time of last data packet, for checking a connection is alive.
|
||||
virtual void getRates(RsBwRates &rates);
|
||||
virtual int getQueueSize(bool in); // extracting data.
|
||||
protected:
|
||||
virtual int gatherOutQueueStatistics(std::vector<uint32_t>& per_service_count,std::vector<uint32_t>& per_priority_count); // extracting data.
|
||||
protected:
|
||||
|
||||
int tick_bio();
|
||||
int tick_send(uint32_t timeout);
|
||||
|
@ -77,6 +78,7 @@ class pqistreamer: public PQInterface
|
|||
virtual void locked_clear_out_queue() ;
|
||||
virtual int locked_compute_out_pkt_size() const ;
|
||||
virtual void *locked_pop_out_data() ;
|
||||
virtual int locked_gatherStatistics(std::vector<uint32_t>& per_service_count,std::vector<uint32_t>& per_priority_count) const; // extracting data.
|
||||
|
||||
|
||||
protected:
|
||||
|
@ -123,6 +125,8 @@ class pqistreamer: public PQInterface
|
|||
std::list<void *> mOutPkts; // Cntrl / Search / Results queue
|
||||
std::list<RsItem *> mIncoming;
|
||||
|
||||
uint32_t mIncomingSize; // size of mIncoming. To avoid calling linear cost std::list::size()
|
||||
|
||||
// data for network stats.
|
||||
int mTotalRead;
|
||||
int mTotalSent;
|
||||
|
|
|
@ -171,6 +171,12 @@ class RsConfigDataRates
|
|||
int mQueueOut;
|
||||
};
|
||||
|
||||
class OutQueueStatistics
|
||||
{
|
||||
public:
|
||||
std::map<uint16_t,uint32_t> per_service_item_count ;
|
||||
std::vector<uint32_t> per_priority_item_count ;
|
||||
};
|
||||
|
||||
class RsConfigNetStatus
|
||||
{
|
||||
|
@ -240,7 +246,7 @@ virtual int getConfigNetStatus(RsConfigNetStatus &status) = 0;
|
|||
|
||||
virtual int getTotalBandwidthRates(RsConfigDataRates &rates) = 0;
|
||||
virtual int getAllBandwidthRates(std::map<RsPeerId, RsConfigDataRates> &ratemap) = 0;
|
||||
|
||||
virtual int getOutQueueStatistics(OutQueueStatistics& stats) = 0 ;
|
||||
/* From RsInit */
|
||||
|
||||
// NOT IMPLEMENTED YET!
|
||||
|
|
|
@ -205,7 +205,13 @@ int p3ServerConfig::getTotalBandwidthRates(RsConfigDataRates &rates)
|
|||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int p3ServerConfig::getOutQueueStatistics(OutQueueStatistics& stats)
|
||||
{
|
||||
if (rsBandwidthControl)
|
||||
return rsBandwidthControl->ExtractOutQueueStatistics(stats);
|
||||
else
|
||||
return 0;
|
||||
}
|
||||
int p3ServerConfig::getAllBandwidthRates(std::map<RsPeerId, RsConfigDataRates> &ratemap)
|
||||
{
|
||||
if (rsBandwidthControl)
|
||||
|
|
|
@ -58,6 +58,7 @@ virtual int getConfigStartup(RsConfigStartup ¶ms);
|
|||
|
||||
virtual int getTotalBandwidthRates(RsConfigDataRates &rates);
|
||||
virtual int getAllBandwidthRates(std::map<RsPeerId, RsConfigDataRates> &ratemap);
|
||||
virtual int getOutQueueStatistics(OutQueueStatistics& stats) ;
|
||||
|
||||
/* From RsInit */
|
||||
|
||||
|
|
|
@ -280,6 +280,11 @@ int p3BandwidthControl::getAllBandwidthRates(std::map<RsPeerId, RsConfigDataRate
|
|||
|
||||
}
|
||||
|
||||
int p3BandwidthControl::ExtractOutQueueStatistics(OutQueueStatistics& stats)
|
||||
{
|
||||
return mPg->ExtractOutQueueStatistics(stats) ;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -94,6 +94,7 @@ class p3BandwidthControl: public p3Service, public pqiServiceMonitor
|
|||
virtual int getAllBandwidthRates(std::map<RsPeerId, RsConfigDataRates> &ratemap);
|
||||
|
||||
|
||||
virtual int ExtractOutQueueStatistics(OutQueueStatistics& stats) ;
|
||||
|
||||
/*!
|
||||
* Interface stuff.
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue