From 0d1d31a25fdfb7288a59e860a5912dcd58abf7f3 Mon Sep 17 00:00:00 2001 From: csoler Date: Sat, 11 Jun 2016 09:33:11 -0400 Subject: [PATCH] added mutex protection around pqistreamer::getRates, since float r/w are not necessarily atomic --- libretroshare/src/pqi/pqi_base.h | 126 +++++++++++++-------------- libretroshare/src/pqi/pqistreamer.cc | 53 +++++++---- libretroshare/src/pqi/pqistreamer.h | 8 +- 3 files changed, 105 insertions(+), 82 deletions(-) diff --git a/libretroshare/src/pqi/pqi_base.h b/libretroshare/src/pqi/pqi_base.h index 7423baffe..d247d9862 100644 --- a/libretroshare/src/pqi/pqi_base.h +++ b/libretroshare/src/pqi/pqi_base.h @@ -80,103 +80,103 @@ class RateInterface public: RateInterface() - :bw_in(0), bw_out(0), bwMax_in(0), bwMax_out(0), - bwCapEnabled(false), bwCap_in(0), bwCap_out(0) { return; } + :bw_in(0), bw_out(0), bwMax_in(0), bwMax_out(0), + bwCapEnabled(false), bwCap_in(0), bwCap_out(0) { return; } -virtual ~RateInterface() { return; } + virtual ~RateInterface() { return; } -virtual void getRates(RsBwRates &rates) -{ - rates.mRateIn = bw_in; - rates.mRateOut = bw_out; - rates.mMaxRateIn = bwMax_in; - rates.mMaxRateOut = bwMax_out; - return; -} - - virtual int gatherStatistics(std::list& /* outqueue_lst */,std::list& /* inqueue_lst */) { return 0;} - -virtual int getQueueSize(bool /* in */) { return 0;} -virtual float getRate(bool in) + virtual void getRates(RsBwRates &rates) { - if (in) - return bw_in; - return bw_out; + rates.mRateIn = bw_in; + rates.mRateOut = bw_out; + rates.mMaxRateIn = bwMax_in; + rates.mMaxRateOut = bwMax_out; + return; } -virtual float getMaxRate(bool in) + virtual int gatherStatistics(std::list& /* outqueue_lst */,std::list& /* inqueue_lst */) { return 0;} + + virtual int getQueueSize(bool /* in */) { return 0;} + virtual float getRate(bool in) { - if (in) - return bwMax_in; - return bwMax_out; + if (in) + return bw_in; + return bw_out; } -virtual void setMaxRate(bool in, float val) + virtual float getMaxRate(bool in) { - if (in) + if (in) + return bwMax_in; + return bwMax_out; + } + + virtual void setMaxRate(bool in, float val) { - bwMax_in = val; - if (bwCapEnabled) + if (in) { - if (bwMax_in > bwCap_in) + bwMax_in = val; + if (bwCapEnabled) { - bwMax_in = bwCap_in; + if (bwMax_in > bwCap_in) + { + bwMax_in = bwCap_in; + } } } - } - else - { - bwMax_out = val; - if (bwCapEnabled) + else { - if (bwMax_out > bwCap_out) + bwMax_out = val; + if (bwCapEnabled) { - bwMax_out = bwCap_out; + if (bwMax_out > bwCap_out) + { + bwMax_out = bwCap_out; + } } } - } - return; + return; } -virtual void setRateCap(float val_in, float val_out) -{ - if ((val_in == 0) && (val_out == 0)) + virtual void setRateCap(float val_in, float val_out) { + if ((val_in == 0) && (val_out == 0)) + { #ifdef DEBUG_RATECAP - std::cerr << "RateInterface::setRateCap() Now disabled" << std::endl; + std::cerr << "RateInterface::setRateCap() Now disabled" << std::endl; #endif - bwCapEnabled = false; - } - else - { + bwCapEnabled = false; + } + else + { #ifdef DEBUG_RATECAP - std::cerr << "RateInterface::setRateCap() Enabled "; - std::cerr << "in: " << bwCap_in << " out: " << bwCap_out << std::endl; + std::cerr << "RateInterface::setRateCap() Enabled "; + std::cerr << "in: " << bwCap_in << " out: " << bwCap_out << std::endl; #endif - bwCapEnabled = true; - bwCap_in = val_in; - bwCap_out = val_out; + bwCapEnabled = true; + bwCap_in = val_in; + bwCap_out = val_out; + } + return; } - return; -} protected: -void setRate(bool in, float val) + virtual void setRate(bool in, float val) { - if (in) - bw_in = val; - else - bw_out = val; - return; + if (in) + bw_in = val; + else + bw_out = val; + return; } - private: -float bw_in, bw_out, bwMax_in, bwMax_out; -bool bwCapEnabled; -float bwCap_in, bwCap_out; +private: + float bw_in, bw_out, bwMax_in, bwMax_out; + bool bwCapEnabled; + float bwCap_in, bwCap_out; }; diff --git a/libretroshare/src/pqi/pqistreamer.cc b/libretroshare/src/pqi/pqistreamer.cc index 4a4db5c63..37f3f8945 100644 --- a/libretroshare/src/pqi/pqistreamer.cc +++ b/libretroshare/src/pqi/pqistreamer.cc @@ -77,37 +77,38 @@ pqistreamer::pqistreamer(RsSerialiser *rss, const RsPeerId& id, BinInterface *bi mCurrRead(0), mCurrSent(0), mAvgReadCount(0), mAvgSentCount(0) { - RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/ + + // 100 B/s (minimal) + setMaxRate(true, 0.1); + setMaxRate(false, 0.1); + setRate(true, 0); // needs to be off-mutex + setRate(false, 0); + + RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/ mAcceptsPacketSlicing = false ; // by default. Will be turned into true when everyone's ready. mLastSentPacketSlicingProbe = 0 ; - + mAvgLastUpdate = mCurrReadTS = mCurrSentTS = time(NULL); mIncomingSize = 0 ; mStatisticsTimeStamp = 0 ; - /* allocated once */ + /* allocated once */ mPkt_rpend_size = 0; mPkt_rpending = 0; - mReading_state = reading_state_initial ; + mReading_state = reading_state_initial ; - // 100 B/s (minimal) - setMaxRate(true, 0.1); - setMaxRate(false, 0.1); - setRate(true, 0); - setRate(false, 0); + pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::pqistreamer() Initialisation!"); - pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::pqistreamer() Initialisation!"); + if (!bio_in) + { + pqioutput(PQL_ALERT, pqistreamerzone, "pqistreamer::pqistreamer() NULL bio, FATAL ERROR!"); + exit(1); + } - if (!bio_in) - { - pqioutput(PQL_ALERT, pqistreamerzone, "pqistreamer::pqistreamer() NULL bio, FATAL ERROR!"); - exit(1); - } + mFailed_read_attempts = 0; // reset failed read, as no packet is still read. - mFailed_read_attempts = 0; // reset failed read, as no packet is still read. - - return; + return; } pqistreamer::~pqistreamer() @@ -195,6 +196,22 @@ RsItem *pqistreamer::GetItem() return osr; } +float pqistreamer::getRate(bool b) +{ + RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/ + return RateInterface::getRate(b) ; +} +void pqistreamer::setMaxRate(bool b,float f) +{ + RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/ + RateInterface::setMaxRate(b,f) ; +} +void pqistreamer::setRate(bool b,float f) +{ + RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/ + RateInterface::setRate(b,f) ; +} + void pqistreamer::updateRates() { // now update rates both ways. diff --git a/libretroshare/src/pqi/pqistreamer.h b/libretroshare/src/pqi/pqistreamer.h index ae94c9c1a..80344d6cd 100644 --- a/libretroshare/src/pqi/pqistreamer.h +++ b/libretroshare/src/pqi/pqistreamer.h @@ -67,6 +67,12 @@ class pqistreamer: public PQInterface virtual void getRates(RsBwRates &rates); virtual int getQueueSize(bool in); // extracting data. virtual int gatherStatistics(std::list& outqueue_stats,std::list& inqueue_stats); // extracting data. + + // mutex protected versions of RateInterface calls. + virtual void setRate(bool b,float f) ; + virtual void setMaxRate(bool b,float f) ; + virtual float getRate(bool b) ; + protected: int tick_bio(); @@ -85,7 +91,7 @@ class pqistreamer: public PQInterface virtual int locked_gatherStatistics(std::list& outqueue_stats,std::list& inqueue_stats); // extracting data. void updateRates() ; - + protected: RsMutex mStreamerMtx ; // Protects data, fns below, protected so pqiqos can use it too.