diff --git a/libretroshare/src/pqi/pqistreamer.cc b/libretroshare/src/pqi/pqistreamer.cc index 393f2ac70..62b4d7f84 100644 --- a/libretroshare/src/pqi/pqistreamer.cc +++ b/libretroshare/src/pqi/pqistreamer.cc @@ -1,5 +1,5 @@ /******************************************************************************* - * libretroshare/src/pqi: pqistreamer.h * + * libretroshare/src/pqi: pqistreamer.cc * * * * libretroshare: retroshare core library * * * @@ -102,38 +102,39 @@ pqistreamer::pqistreamer(RsSerialiser *rss, const RsPeerId& id, BinInterface *bi mAvgDtOut(0), mAvgDtIn(0) { - // 100 B/s (minimal) - setMaxRate(true, 0.1); - setMaxRate(false, 0.1); - setRate(true, 0); // needs to be off-mutex - setRate(false, 0); + // 100 B/s (minimal) + setMaxRate(true, 0.1); + setMaxRate(false, 0.1); + setRate(true, 0); // needs to be off-mutex + setRate(false, 0); - RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/ + RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/ - mAcceptsPacketSlicing = false ; // by default. Will be turned into true when everyone's ready. - mLastSentPacketSlicingProbe = 0 ; + mAcceptsPacketSlicing = false ; // by default. Will be turned into true when everyone's ready. + mLastSentPacketSlicingProbe = 0 ; - mAvgLastUpdate = mCurrSentTS = mCurrReadTS = getCurrentTS(); + mAvgLastUpdate = mCurrSentTS = mCurrReadTS = getCurrentTS(); - mIncomingSize = 0 ; + mIncomingSize = 0 ; + mIncomingSize_bytes = 0; - mStatisticsTimeStamp = 0 ; - /* allocated once */ - mPkt_rpend_size = 0; - mPkt_rpending = 0; - mReading_state = reading_state_initial ; + mStatisticsTimeStamp = 0 ; + /* allocated once */ + mPkt_rpend_size = 0; + mPkt_rpending = 0; + mReading_state = reading_state_initial ; - pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::pqistreamer() Initialisation!"); + pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::pqistreamer() Initialisation!"); - if (!bio_in) - { - pqioutput(PQL_ALERT, pqistreamerzone, "pqistreamer::pqistreamer() NULL bio, FATAL ERROR!"); - exit(1); - } + if (!bio_in) + { + pqioutput(PQL_ALERT, pqistreamerzone, "pqistreamer::pqistreamer() NULL bio, FATAL ERROR!"); + exit(1); + } - mFailed_read_attempts = 0; // reset failed read, as no packet is still read. + mFailed_read_attempts = 0; // reset failed read, as no packet is still read. - return; + return; } pqistreamer::~pqistreamer() @@ -177,6 +178,7 @@ pqistreamer::~pqistreamer() // Get/Send Items. +// This is the entry poing for methods willing to send items through our out queue int pqistreamer::SendItem(RsItem *si,uint32_t& out_size) { #ifdef RSITEM_DEBUG @@ -205,10 +207,24 @@ RsItem *pqistreamer::GetItem() RsItem *osr = mIncoming.front() ; mIncoming.pop_front() ; --mIncomingSize; +// for future use +// mIncomingSize_bytes -= return osr; } + +float pqistreamer::getMaxRate(bool b) +{ + RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/ + return getMaxRate_locked(b); +} + +float pqistreamer::getMaxRate_locked(bool b) +{ + return RateInterface::getMaxRate(b) ; +} + float pqistreamer::getRate(bool b) { RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/ @@ -217,26 +233,28 @@ float pqistreamer::getRate(bool b) void pqistreamer::setMaxRate(bool b,float f) { - RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/ - RateInterface::setMaxRate(b,f) ; + RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/ + setMaxRate_locked(b,f); } + +void pqistreamer::setMaxRate_locked(bool b,float f) +{ + RateInterface::setMaxRate(b,f) ; +} + void pqistreamer::setRate(bool b,float f) { RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/ RateInterface::setRate(b,f) ; } + void pqistreamer::updateRates() { - // update rates both ways. + // update actual rates both ways. double t = getCurrentTS(); // get current timestamp. - double diff ; - - { - RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/ - diff = t - mAvgLastUpdate ; - } + double diff = t - mAvgLastUpdate; if (diff > PQISTREAM_AVG_PERIOD) { @@ -261,10 +279,11 @@ void pqistreamer::updateRates() setRate(false, 0); } + mAvgLastUpdate = t; + mAvgReadCount = 0; + { RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/ - mAvgLastUpdate = t; - mAvgReadCount = 0; mAvgSentCount = 0; } } @@ -275,7 +294,7 @@ int pqistreamer::tick_bio() RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/ mBio->tick(); - /* short circuit everything is bio isn't active */ + /* short circuit everything if bio isn't active */ if (!(mBio->isactive())) { return 0; @@ -283,21 +302,12 @@ int pqistreamer::tick_bio() return 1; } - int pqistreamer::tick_recv(uint32_t timeout) { // Apart from a few exceptions that are atomic (mLastIncomingTs, mIncomingSize), only this pqi thread reads/writes mIncoming queue and related counters. -// The lock of pqistreamer mutex is thus not needed. -// The mutex lock is still needed before calling locked_addTrafficClue because this method is also used by the thread pushing packets in mOutPkts -// -// The following methods have been renamed by removing the 'locked' part of the name: -// - handleincoming_locked -// - handleincomingitem_locked -// - inReadBytes_locked -// - inAllowedBytes_locked -// - addPartialPacket_locked -// - allocate_rpend_locked -// - free_pend_locked; +// The lock of pqistreamer mutex is thus not needed here. +// The mutex lock is still needed before calling locked_addTrafficClue because this method is also used by the thread pushing packets in mOutPkts. +// Locks around rates are provided internally. if (mBio->moretoread(timeout)) { @@ -310,7 +320,6 @@ int pqistreamer::tick_recv(uint32_t timeout) return 1; } - int pqistreamer::tick_send(uint32_t timeout) { /* short circuit everything if bio isn't active */ @@ -348,12 +357,11 @@ int pqistreamer::status() return 0; } +// this method is overloaded by pqiqosstreamer void pqistreamer::locked_storeInOutputQueue(void *ptr,int,int) { mOutPkts.push_back(ptr); } -// -/**************** HANDLE OUTGOING TRANSLATION + TRANSMISSION ******/ int pqistreamer::queue_outpqi_locked(RsItem *pqi,uint32_t& pktsize) { @@ -362,7 +370,6 @@ int pqistreamer::queue_outpqi_locked(RsItem *pqi,uint32_t& pktsize) std::cerr << "pqistreamer::queue_outpqi() called." << std::endl; #endif - /* decide which type of packet it is */ pktsize = mRsSerialiser->size(pqi); @@ -370,7 +377,6 @@ int pqistreamer::queue_outpqi_locked(RsItem *pqi,uint32_t& pktsize) if(ptr == NULL) return 0 ; - #ifdef DEBUG_PQISTREAMER std::cerr << "pqistreamer::queue_outpqi() serializing packet with packet size : " << pktsize << std::endl; @@ -424,16 +430,17 @@ int pqistreamer::handleincomingitem(RsItem *pqi,int len) pqi -> PeerId(PeerId()); mIncoming.push_back(pqi); - ++mIncomingSize ; + ++mIncomingSize; + // for future use + // mIncomingSize_bytes += len; /*******************************************************************************************/ // keep info for stats for a while. Only keep the items for the last two seconds. sec n is ongoing and second n-1 // is a full statistics chunk that can be used in the GUI - - RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/ - - locked_addTrafficClue(pqi,len,mCurrentStatsChunk_In) ; - + { + RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/ + locked_addTrafficClue(pqi,len,mCurrentStatsChunk_In) ; + } /*******************************************************************************************/ return 1; @@ -467,8 +474,8 @@ void pqistreamer::locked_addTrafficClue(const RsItem *pqi,uint32_t pktsize,std:: rstime_t pqistreamer::getLastIncomingTS() { -// This is the only case where another thread (rs main for pqiperson) will access our data -// Still a mutex lock is not needed because the operation is atomic + // This is the only case where another thread (rs main for pqiperson) will access our data + // Still a mutex lock is not needed because the operation is atomic return mLastIncomingTs; } @@ -1145,7 +1152,7 @@ int pqistreamer::outAllowedBytes_locked() // low pass filter on mAvgDtOut mAvgDtOut = PQISTREAM_AVG_DT_FRAC * mAvgDtOut + (1 - PQISTREAM_AVG_DT_FRAC) * dt; - double maxout = getMaxRate(false) * 1024.0; + double maxout = getMaxRate_locked(false) * 1024.0; // this is used to take into account a possible excess of data sent during the previous round mCurrSent -= int(dt * maxout); @@ -1330,6 +1337,7 @@ int pqistreamer::gatherStatistics(std::list& outqueue_lst,std return locked_gatherStatistics(outqueue_lst,inqueue_lst); } +// this method is overloaded by pqiqosstreamer int pqistreamer::getQueueSize(bool in) { if (in) @@ -1342,6 +1350,19 @@ int pqistreamer::getQueueSize(bool in) } } +int pqistreamer::getQueueSize_bytes(bool in) +{ + if (in) +// no mutex is needed here because this is atomic +// for future use, mIncomingSize_bytes is not updated yet + return mIncomingSize_bytes; + else + { + RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/ + return locked_compute_out_pkt_size(); + } +} + void pqistreamer::getRates(RsBwRates &rates) { RateInterface::getRates(rates); @@ -1349,10 +1370,13 @@ void pqistreamer::getRates(RsBwRates &rates) // no mutex is needed here because this is atomic rates.mQueueIn = mIncomingSize; - RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/ - rates.mQueueOut = locked_out_queue_size(); + { + RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/ + rates.mQueueOut = locked_out_queue_size(); + } } +// this method is overloaded by pqiqosstreamer int pqistreamer::locked_out_queue_size() const { // Warning: because out_pkt is a list, calling size @@ -1362,6 +1386,7 @@ int pqistreamer::locked_out_queue_size() const return mOutPkts.size() ; } +// this method is overloaded by pqiqosstreamer void pqistreamer::locked_clear_out_queue() { for(std::list::iterator it = mOutPkts.begin(); it != mOutPkts.end(); ) @@ -1376,6 +1401,7 @@ void pqistreamer::locked_clear_out_queue() } } +// this method is overloaded by pqiqosstreamer int pqistreamer::locked_compute_out_pkt_size() const { int total = 0 ; @@ -1394,6 +1420,7 @@ int pqistreamer::locked_gatherStatistics(std::list& out_lst,std:: return 1 ; } +// this method is overloaded by pqiqosstreamer void *pqistreamer::locked_pop_out_data(uint32_t /*max_slice_size*/, uint32_t &size, bool &starts, bool &ends, uint32_t &packet_id) { size = 0 ; diff --git a/libretroshare/src/pqi/pqistreamer.h b/libretroshare/src/pqi/pqistreamer.h index 1589448c9..0e7f6b815 100644 --- a/libretroshare/src/pqi/pqistreamer.h +++ b/libretroshare/src/pqi/pqistreamer.h @@ -38,8 +38,8 @@ class RsSerialiser; struct PartialPacketRecord { - void *mem ; - uint32_t size ; + void *mem ; + uint32_t size ; }; /** @@ -65,18 +65,23 @@ class pqistreamer: public PQInterface virtual RsItem *GetItem(); virtual int status(); - rstime_t getLastIncomingTS(); // Time of last data packet, for checking a connection is alive. + rstime_t getLastIncomingTS(); // Time of last data packet, for checking a connection is alive. virtual void getRates(RsBwRates &rates); virtual int getQueueSize(bool in); // extracting data. + virtual int getQueueSize_bytes(bool in); // size of incoming queue in bytes virtual int gatherStatistics(std::list& outqueue_stats,std::list& inqueue_stats); // extracting data. // mutex protected versions of RateInterface calls. virtual void setRate(bool b,float f) ; virtual void setMaxRate(bool b,float f) ; - virtual float getRate(bool b) ; + virtual void setMaxRate_locked(bool b,float f) ; - protected: - virtual int reset() ; + virtual float getRate(bool b) ; + virtual float getMaxRate(bool b) ; + virtual float getMaxRate_locked(bool b); + + protected: + virtual int reset() ; int tick_bio(); int tick_send(uint32_t timeout); @@ -129,13 +134,12 @@ class pqistreamer: public PQInterface void *mPkt_wpending; // storage for pending packet to write. uint32_t mPkt_wpending_size; // ... and its size. - void allocate_rpend(); // use these two functions to allocate/free the buffer below + void allocate_rpend(); // use these two functions to allocate/free the buffer below int mPkt_rpend_size; // size of pkt_rpending. void *mPkt_rpending; // storage for read in pending packets. - enum {reading_state_packet_started=1, - reading_state_initial=0 } ; + enum {reading_state_packet_started=1, reading_state_initial=0 } ; int mReading_state ; int mFailed_read_attempts ; @@ -144,7 +148,8 @@ class pqistreamer: public PQInterface std::list mOutPkts; // Cntrl / Search / Results queue std::list mIncoming; - uint32_t mIncomingSize; // size of mIncoming. To avoid calling linear cost std::list::size() + uint32_t mIncomingSize; // size of mIncoming. To avoid calling linear cost std::list::size() + uint32_t mIncomingSize_bytes; // size of Incoming in btyes // data for network stats. int mTotalRead; @@ -154,8 +159,8 @@ class pqistreamer: public PQInterface int mCurrRead; int mCurrSent; - double mCurrReadTS; // TS from which these are measured. - double mCurrSentTS; + double mCurrReadTS; // TS from which these are measured. + double mCurrSentTS; double mAvgLastUpdate; // TS from which these are measured. uint32_t mAvgReadCount; @@ -174,12 +179,12 @@ class pqistreamer: public PQInterface std::list mCurrentStatsChunk_Out ; rstime_t mStatisticsTimeStamp ; - bool mAcceptsPacketSlicing ; - rstime_t mLastSentPacketSlicingProbe ; - void locked_addTrafficClue(const RsItem *pqi, uint32_t pktsize, std::list &lst); - RsItem *addPartialPacket(const void *block, uint32_t len, uint32_t slice_packet_id,bool packet_starting,bool packet_ending,uint32_t& total_len); + bool mAcceptsPacketSlicing ; + rstime_t mLastSentPacketSlicingProbe ; + void locked_addTrafficClue(const RsItem *pqi, uint32_t pktsize, std::list &lst); + RsItem *addPartialPacket(const void *block, uint32_t len, uint32_t slice_packet_id,bool packet_starting,bool packet_ending,uint32_t& total_len); - std::map mPartialPackets ; + std::map mPartialPackets ; }; #endif //MRK_PQI_STREAMER_HEADER