finish removing pqistreamer locks, clean rates methods

This commit is contained in:
jolavillette 2020-05-04 22:42:18 +02:00
parent 28458bf10a
commit 7113d1935f
2 changed files with 113 additions and 81 deletions

View File

@ -1,5 +1,5 @@
/*******************************************************************************
* libretroshare/src/pqi: pqistreamer.h *
* libretroshare/src/pqi: pqistreamer.cc *
* *
* libretroshare: retroshare core library *
* *
@ -102,38 +102,39 @@ pqistreamer::pqistreamer(RsSerialiser *rss, const RsPeerId& id, BinInterface *bi
mAvgDtOut(0), mAvgDtIn(0)
{
// 100 B/s (minimal)
setMaxRate(true, 0.1);
setMaxRate(false, 0.1);
setRate(true, 0); // needs to be off-mutex
setRate(false, 0);
// 100 B/s (minimal)
setMaxRate(true, 0.1);
setMaxRate(false, 0.1);
setRate(true, 0); // needs to be off-mutex
setRate(false, 0);
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
mAcceptsPacketSlicing = false ; // by default. Will be turned into true when everyone's ready.
mLastSentPacketSlicingProbe = 0 ;
mAcceptsPacketSlicing = false ; // by default. Will be turned into true when everyone's ready.
mLastSentPacketSlicingProbe = 0 ;
mAvgLastUpdate = mCurrSentTS = mCurrReadTS = getCurrentTS();
mAvgLastUpdate = mCurrSentTS = mCurrReadTS = getCurrentTS();
mIncomingSize = 0 ;
mIncomingSize = 0 ;
mIncomingSize_bytes = 0;
mStatisticsTimeStamp = 0 ;
/* allocated once */
mPkt_rpend_size = 0;
mPkt_rpending = 0;
mReading_state = reading_state_initial ;
mStatisticsTimeStamp = 0 ;
/* allocated once */
mPkt_rpend_size = 0;
mPkt_rpending = 0;
mReading_state = reading_state_initial ;
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::pqistreamer() Initialisation!");
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::pqistreamer() Initialisation!");
if (!bio_in)
{
pqioutput(PQL_ALERT, pqistreamerzone, "pqistreamer::pqistreamer() NULL bio, FATAL ERROR!");
exit(1);
}
if (!bio_in)
{
pqioutput(PQL_ALERT, pqistreamerzone, "pqistreamer::pqistreamer() NULL bio, FATAL ERROR!");
exit(1);
}
mFailed_read_attempts = 0; // reset failed read, as no packet is still read.
mFailed_read_attempts = 0; // reset failed read, as no packet is still read.
return;
return;
}
pqistreamer::~pqistreamer()
@ -177,6 +178,7 @@ pqistreamer::~pqistreamer()
// Get/Send Items.
// This is the entry poing for methods willing to send items through our out queue
int pqistreamer::SendItem(RsItem *si,uint32_t& out_size)
{
#ifdef RSITEM_DEBUG
@ -205,10 +207,24 @@ RsItem *pqistreamer::GetItem()
RsItem *osr = mIncoming.front() ;
mIncoming.pop_front() ;
--mIncomingSize;
// for future use
// mIncomingSize_bytes -=
return osr;
}
float pqistreamer::getMaxRate(bool b)
{
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
return getMaxRate_locked(b);
}
float pqistreamer::getMaxRate_locked(bool b)
{
return RateInterface::getMaxRate(b) ;
}
float pqistreamer::getRate(bool b)
{
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
@ -217,26 +233,28 @@ float pqistreamer::getRate(bool b)
void pqistreamer::setMaxRate(bool b,float f)
{
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
RateInterface::setMaxRate(b,f) ;
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
setMaxRate_locked(b,f);
}
void pqistreamer::setMaxRate_locked(bool b,float f)
{
RateInterface::setMaxRate(b,f) ;
}
void pqistreamer::setRate(bool b,float f)
{
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
RateInterface::setRate(b,f) ;
}
void pqistreamer::updateRates()
{
// update rates both ways.
// update actual rates both ways.
double t = getCurrentTS(); // get current timestamp.
double diff ;
{
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
diff = t - mAvgLastUpdate ;
}
double diff = t - mAvgLastUpdate;
if (diff > PQISTREAM_AVG_PERIOD)
{
@ -261,10 +279,11 @@ void pqistreamer::updateRates()
setRate(false, 0);
}
mAvgLastUpdate = t;
mAvgReadCount = 0;
{
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
mAvgLastUpdate = t;
mAvgReadCount = 0;
mAvgSentCount = 0;
}
}
@ -275,7 +294,7 @@ int pqistreamer::tick_bio()
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
mBio->tick();
/* short circuit everything is bio isn't active */
/* short circuit everything if bio isn't active */
if (!(mBio->isactive()))
{
return 0;
@ -283,21 +302,12 @@ int pqistreamer::tick_bio()
return 1;
}
int pqistreamer::tick_recv(uint32_t timeout)
{
// Apart from a few exceptions that are atomic (mLastIncomingTs, mIncomingSize), only this pqi thread reads/writes mIncoming queue and related counters.
// The lock of pqistreamer mutex is thus not needed.
// The mutex lock is still needed before calling locked_addTrafficClue because this method is also used by the thread pushing packets in mOutPkts
//
// The following methods have been renamed by removing the 'locked' part of the name:
// - handleincoming_locked
// - handleincomingitem_locked
// - inReadBytes_locked
// - inAllowedBytes_locked
// - addPartialPacket_locked
// - allocate_rpend_locked
// - free_pend_locked;
// The lock of pqistreamer mutex is thus not needed here.
// The mutex lock is still needed before calling locked_addTrafficClue because this method is also used by the thread pushing packets in mOutPkts.
// Locks around rates are provided internally.
if (mBio->moretoread(timeout))
{
@ -310,7 +320,6 @@ int pqistreamer::tick_recv(uint32_t timeout)
return 1;
}
int pqistreamer::tick_send(uint32_t timeout)
{
/* short circuit everything if bio isn't active */
@ -348,12 +357,11 @@ int pqistreamer::status()
return 0;
}
// this method is overloaded by pqiqosstreamer
void pqistreamer::locked_storeInOutputQueue(void *ptr,int,int)
{
mOutPkts.push_back(ptr);
}
//
/**************** HANDLE OUTGOING TRANSLATION + TRANSMISSION ******/
int pqistreamer::queue_outpqi_locked(RsItem *pqi,uint32_t& pktsize)
{
@ -362,7 +370,6 @@ int pqistreamer::queue_outpqi_locked(RsItem *pqi,uint32_t& pktsize)
std::cerr << "pqistreamer::queue_outpqi() called." << std::endl;
#endif
/* decide which type of packet it is */
pktsize = mRsSerialiser->size(pqi);
@ -370,7 +377,6 @@ int pqistreamer::queue_outpqi_locked(RsItem *pqi,uint32_t& pktsize)
if(ptr == NULL)
return 0 ;
#ifdef DEBUG_PQISTREAMER
std::cerr << "pqistreamer::queue_outpqi() serializing packet with packet size : " << pktsize << std::endl;
@ -424,16 +430,17 @@ int pqistreamer::handleincomingitem(RsItem *pqi,int len)
pqi -> PeerId(PeerId());
mIncoming.push_back(pqi);
++mIncomingSize ;
++mIncomingSize;
// for future use
// mIncomingSize_bytes += len;
/*******************************************************************************************/
// keep info for stats for a while. Only keep the items for the last two seconds. sec n is ongoing and second n-1
// is a full statistics chunk that can be used in the GUI
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
locked_addTrafficClue(pqi,len,mCurrentStatsChunk_In) ;
{
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
locked_addTrafficClue(pqi,len,mCurrentStatsChunk_In) ;
}
/*******************************************************************************************/
return 1;
@ -467,8 +474,8 @@ void pqistreamer::locked_addTrafficClue(const RsItem *pqi,uint32_t pktsize,std::
rstime_t pqistreamer::getLastIncomingTS()
{
// This is the only case where another thread (rs main for pqiperson) will access our data
// Still a mutex lock is not needed because the operation is atomic
// This is the only case where another thread (rs main for pqiperson) will access our data
// Still a mutex lock is not needed because the operation is atomic
return mLastIncomingTs;
}
@ -1145,7 +1152,7 @@ int pqistreamer::outAllowedBytes_locked()
// low pass filter on mAvgDtOut
mAvgDtOut = PQISTREAM_AVG_DT_FRAC * mAvgDtOut + (1 - PQISTREAM_AVG_DT_FRAC) * dt;
double maxout = getMaxRate(false) * 1024.0;
double maxout = getMaxRate_locked(false) * 1024.0;
// this is used to take into account a possible excess of data sent during the previous round
mCurrSent -= int(dt * maxout);
@ -1330,6 +1337,7 @@ int pqistreamer::gatherStatistics(std::list<RSTrafficClue>& outqueue_lst,std
return locked_gatherStatistics(outqueue_lst,inqueue_lst);
}
// this method is overloaded by pqiqosstreamer
int pqistreamer::getQueueSize(bool in)
{
if (in)
@ -1342,6 +1350,19 @@ int pqistreamer::getQueueSize(bool in)
}
}
int pqistreamer::getQueueSize_bytes(bool in)
{
if (in)
// no mutex is needed here because this is atomic
// for future use, mIncomingSize_bytes is not updated yet
return mIncomingSize_bytes;
else
{
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
return locked_compute_out_pkt_size();
}
}
void pqistreamer::getRates(RsBwRates &rates)
{
RateInterface::getRates(rates);
@ -1349,10 +1370,13 @@ void pqistreamer::getRates(RsBwRates &rates)
// no mutex is needed here because this is atomic
rates.mQueueIn = mIncomingSize;
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
rates.mQueueOut = locked_out_queue_size();
{
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
rates.mQueueOut = locked_out_queue_size();
}
}
// this method is overloaded by pqiqosstreamer
int pqistreamer::locked_out_queue_size() const
{
// Warning: because out_pkt is a list, calling size
@ -1362,6 +1386,7 @@ int pqistreamer::locked_out_queue_size() const
return mOutPkts.size() ;
}
// this method is overloaded by pqiqosstreamer
void pqistreamer::locked_clear_out_queue()
{
for(std::list<void*>::iterator it = mOutPkts.begin(); it != mOutPkts.end(); )
@ -1376,6 +1401,7 @@ void pqistreamer::locked_clear_out_queue()
}
}
// this method is overloaded by pqiqosstreamer
int pqistreamer::locked_compute_out_pkt_size() const
{
int total = 0 ;
@ -1394,6 +1420,7 @@ int pqistreamer::locked_gatherStatistics(std::list<RSTrafficClue>& out_lst,std::
return 1 ;
}
// this method is overloaded by pqiqosstreamer
void *pqistreamer::locked_pop_out_data(uint32_t /*max_slice_size*/, uint32_t &size, bool &starts, bool &ends, uint32_t &packet_id)
{
size = 0 ;

View File

@ -38,8 +38,8 @@ class RsSerialiser;
struct PartialPacketRecord
{
void *mem ;
uint32_t size ;
void *mem ;
uint32_t size ;
};
/**
@ -65,18 +65,23 @@ class pqistreamer: public PQInterface
virtual RsItem *GetItem();
virtual int status();
rstime_t getLastIncomingTS(); // Time of last data packet, for checking a connection is alive.
rstime_t getLastIncomingTS(); // Time of last data packet, for checking a connection is alive.
virtual void getRates(RsBwRates &rates);
virtual int getQueueSize(bool in); // extracting data.
virtual int getQueueSize_bytes(bool in); // size of incoming queue in bytes
virtual int gatherStatistics(std::list<RSTrafficClue>& outqueue_stats,std::list<RSTrafficClue>& inqueue_stats); // extracting data.
// mutex protected versions of RateInterface calls.
virtual void setRate(bool b,float f) ;
virtual void setMaxRate(bool b,float f) ;
virtual float getRate(bool b) ;
virtual void setMaxRate_locked(bool b,float f) ;
protected:
virtual int reset() ;
virtual float getRate(bool b) ;
virtual float getMaxRate(bool b) ;
virtual float getMaxRate_locked(bool b);
protected:
virtual int reset() ;
int tick_bio();
int tick_send(uint32_t timeout);
@ -129,13 +134,12 @@ class pqistreamer: public PQInterface
void *mPkt_wpending; // storage for pending packet to write.
uint32_t mPkt_wpending_size; // ... and its size.
void allocate_rpend(); // use these two functions to allocate/free the buffer below
void allocate_rpend(); // use these two functions to allocate/free the buffer below
int mPkt_rpend_size; // size of pkt_rpending.
void *mPkt_rpending; // storage for read in pending packets.
enum {reading_state_packet_started=1,
reading_state_initial=0 } ;
enum {reading_state_packet_started=1, reading_state_initial=0 } ;
int mReading_state ;
int mFailed_read_attempts ;
@ -144,7 +148,8 @@ class pqistreamer: public PQInterface
std::list<void *> mOutPkts; // Cntrl / Search / Results queue
std::list<RsItem *> mIncoming;
uint32_t mIncomingSize; // size of mIncoming. To avoid calling linear cost std::list::size()
uint32_t mIncomingSize; // size of mIncoming. To avoid calling linear cost std::list::size()
uint32_t mIncomingSize_bytes; // size of Incoming in btyes
// data for network stats.
int mTotalRead;
@ -154,8 +159,8 @@ class pqistreamer: public PQInterface
int mCurrRead;
int mCurrSent;
double mCurrReadTS; // TS from which these are measured.
double mCurrSentTS;
double mCurrReadTS; // TS from which these are measured.
double mCurrSentTS;
double mAvgLastUpdate; // TS from which these are measured.
uint32_t mAvgReadCount;
@ -174,12 +179,12 @@ class pqistreamer: public PQInterface
std::list<RSTrafficClue> mCurrentStatsChunk_Out ;
rstime_t mStatisticsTimeStamp ;
bool mAcceptsPacketSlicing ;
rstime_t mLastSentPacketSlicingProbe ;
void locked_addTrafficClue(const RsItem *pqi, uint32_t pktsize, std::list<RSTrafficClue> &lst);
RsItem *addPartialPacket(const void *block, uint32_t len, uint32_t slice_packet_id,bool packet_starting,bool packet_ending,uint32_t& total_len);
bool mAcceptsPacketSlicing ;
rstime_t mLastSentPacketSlicingProbe ;
void locked_addTrafficClue(const RsItem *pqi, uint32_t pktsize, std::list<RSTrafficClue> &lst);
RsItem *addPartialPacket(const void *block, uint32_t len, uint32_t slice_packet_id,bool packet_starting,bool packet_ending,uint32_t& total_len);
std::map<uint32_t,PartialPacketRecord> mPartialPackets ;
std::map<uint32_t,PartialPacketRecord> mPartialPackets ;
};
#endif //MRK_PQI_STREAMER_HEADER