added mutex protection around pqistreamer::getRates, since float r/w are not necessarily atomic

This commit is contained in:
csoler 2016-06-11 09:33:11 -04:00
parent eba90a83c6
commit 0d1d31a25f
3 changed files with 105 additions and 82 deletions

View file

@ -80,103 +80,103 @@ class RateInterface
public: public:
RateInterface() RateInterface()
:bw_in(0), bw_out(0), bwMax_in(0), bwMax_out(0), :bw_in(0), bw_out(0), bwMax_in(0), bwMax_out(0),
bwCapEnabled(false), bwCap_in(0), bwCap_out(0) { return; } bwCapEnabled(false), bwCap_in(0), bwCap_out(0) { return; }
virtual ~RateInterface() { return; } virtual ~RateInterface() { return; }
virtual void getRates(RsBwRates &rates) virtual void getRates(RsBwRates &rates)
{
rates.mRateIn = bw_in;
rates.mRateOut = bw_out;
rates.mMaxRateIn = bwMax_in;
rates.mMaxRateOut = bwMax_out;
return;
}
virtual int gatherStatistics(std::list<RSTrafficClue>& /* outqueue_lst */,std::list<RSTrafficClue>& /* inqueue_lst */) { return 0;}
virtual int getQueueSize(bool /* in */) { return 0;}
virtual float getRate(bool in)
{ {
if (in) rates.mRateIn = bw_in;
return bw_in; rates.mRateOut = bw_out;
return bw_out; rates.mMaxRateIn = bwMax_in;
rates.mMaxRateOut = bwMax_out;
return;
} }
virtual float getMaxRate(bool in) virtual int gatherStatistics(std::list<RSTrafficClue>& /* outqueue_lst */,std::list<RSTrafficClue>& /* inqueue_lst */) { return 0;}
virtual int getQueueSize(bool /* in */) { return 0;}
virtual float getRate(bool in)
{ {
if (in) if (in)
return bwMax_in; return bw_in;
return bwMax_out; return bw_out;
} }
virtual void setMaxRate(bool in, float val) virtual float getMaxRate(bool in)
{ {
if (in) if (in)
return bwMax_in;
return bwMax_out;
}
virtual void setMaxRate(bool in, float val)
{ {
bwMax_in = val; if (in)
if (bwCapEnabled)
{ {
if (bwMax_in > bwCap_in) bwMax_in = val;
if (bwCapEnabled)
{ {
bwMax_in = bwCap_in; if (bwMax_in > bwCap_in)
{
bwMax_in = bwCap_in;
}
} }
} }
} else
else
{
bwMax_out = val;
if (bwCapEnabled)
{ {
if (bwMax_out > bwCap_out) bwMax_out = val;
if (bwCapEnabled)
{ {
bwMax_out = bwCap_out; if (bwMax_out > bwCap_out)
{
bwMax_out = bwCap_out;
}
} }
} }
}
return; return;
} }
virtual void setRateCap(float val_in, float val_out) virtual void setRateCap(float val_in, float val_out)
{
if ((val_in == 0) && (val_out == 0))
{ {
if ((val_in == 0) && (val_out == 0))
{
#ifdef DEBUG_RATECAP #ifdef DEBUG_RATECAP
std::cerr << "RateInterface::setRateCap() Now disabled" << std::endl; std::cerr << "RateInterface::setRateCap() Now disabled" << std::endl;
#endif #endif
bwCapEnabled = false; bwCapEnabled = false;
} }
else else
{ {
#ifdef DEBUG_RATECAP #ifdef DEBUG_RATECAP
std::cerr << "RateInterface::setRateCap() Enabled "; std::cerr << "RateInterface::setRateCap() Enabled ";
std::cerr << "in: " << bwCap_in << " out: " << bwCap_out << std::endl; std::cerr << "in: " << bwCap_in << " out: " << bwCap_out << std::endl;
#endif #endif
bwCapEnabled = true; bwCapEnabled = true;
bwCap_in = val_in; bwCap_in = val_in;
bwCap_out = val_out; bwCap_out = val_out;
}
return;
} }
return;
}
protected: protected:
void setRate(bool in, float val) virtual void setRate(bool in, float val)
{ {
if (in) if (in)
bw_in = val; bw_in = val;
else else
bw_out = val; bw_out = val;
return; return;
} }
private: private:
float bw_in, bw_out, bwMax_in, bwMax_out; float bw_in, bw_out, bwMax_in, bwMax_out;
bool bwCapEnabled; bool bwCapEnabled;
float bwCap_in, bwCap_out; float bwCap_in, bwCap_out;
}; };

View file

@ -77,37 +77,38 @@ pqistreamer::pqistreamer(RsSerialiser *rss, const RsPeerId& id, BinInterface *bi
mCurrRead(0), mCurrSent(0), mCurrRead(0), mCurrSent(0),
mAvgReadCount(0), mAvgSentCount(0) mAvgReadCount(0), mAvgSentCount(0)
{ {
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
// 100 B/s (minimal)
setMaxRate(true, 0.1);
setMaxRate(false, 0.1);
setRate(true, 0); // needs to be off-mutex
setRate(false, 0);
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
mAcceptsPacketSlicing = false ; // by default. Will be turned into true when everyone's ready. mAcceptsPacketSlicing = false ; // by default. Will be turned into true when everyone's ready.
mLastSentPacketSlicingProbe = 0 ; mLastSentPacketSlicingProbe = 0 ;
mAvgLastUpdate = mCurrReadTS = mCurrSentTS = time(NULL); mAvgLastUpdate = mCurrReadTS = mCurrSentTS = time(NULL);
mIncomingSize = 0 ; mIncomingSize = 0 ;
mStatisticsTimeStamp = 0 ; mStatisticsTimeStamp = 0 ;
/* allocated once */ /* allocated once */
mPkt_rpend_size = 0; mPkt_rpend_size = 0;
mPkt_rpending = 0; mPkt_rpending = 0;
mReading_state = reading_state_initial ; mReading_state = reading_state_initial ;
// 100 B/s (minimal) pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::pqistreamer() Initialisation!");
setMaxRate(true, 0.1);
setMaxRate(false, 0.1);
setRate(true, 0);
setRate(false, 0);
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::pqistreamer() Initialisation!"); if (!bio_in)
{
pqioutput(PQL_ALERT, pqistreamerzone, "pqistreamer::pqistreamer() NULL bio, FATAL ERROR!");
exit(1);
}
if (!bio_in) mFailed_read_attempts = 0; // reset failed read, as no packet is still read.
{
pqioutput(PQL_ALERT, pqistreamerzone, "pqistreamer::pqistreamer() NULL bio, FATAL ERROR!");
exit(1);
}
mFailed_read_attempts = 0; // reset failed read, as no packet is still read. return;
return;
} }
pqistreamer::~pqistreamer() pqistreamer::~pqistreamer()
@ -195,6 +196,22 @@ RsItem *pqistreamer::GetItem()
return osr; return osr;
} }
float pqistreamer::getRate(bool b)
{
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
return RateInterface::getRate(b) ;
}
void pqistreamer::setMaxRate(bool b,float f)
{
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
RateInterface::setMaxRate(b,f) ;
}
void pqistreamer::setRate(bool b,float f)
{
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
RateInterface::setRate(b,f) ;
}
void pqistreamer::updateRates() void pqistreamer::updateRates()
{ {
// now update rates both ways. // now update rates both ways.

View file

@ -67,6 +67,12 @@ class pqistreamer: public PQInterface
virtual void getRates(RsBwRates &rates); virtual void getRates(RsBwRates &rates);
virtual int getQueueSize(bool in); // extracting data. virtual int getQueueSize(bool in); // extracting data.
virtual int gatherStatistics(std::list<RSTrafficClue>& outqueue_stats,std::list<RSTrafficClue>& inqueue_stats); // extracting data. virtual int gatherStatistics(std::list<RSTrafficClue>& outqueue_stats,std::list<RSTrafficClue>& inqueue_stats); // extracting data.
// mutex protected versions of RateInterface calls.
virtual void setRate(bool b,float f) ;
virtual void setMaxRate(bool b,float f) ;
virtual float getRate(bool b) ;
protected: protected:
int tick_bio(); int tick_bio();
@ -85,7 +91,7 @@ class pqistreamer: public PQInterface
virtual int locked_gatherStatistics(std::list<RSTrafficClue>& outqueue_stats,std::list<RSTrafficClue>& inqueue_stats); // extracting data. virtual int locked_gatherStatistics(std::list<RSTrafficClue>& outqueue_stats,std::list<RSTrafficClue>& inqueue_stats); // extracting data.
void updateRates() ; void updateRates() ;
protected: protected:
RsMutex mStreamerMtx ; // Protects data, fns below, protected so pqiqos can use it too. RsMutex mStreamerMtx ; // Protects data, fns below, protected so pqiqos can use it too.