mirror of
https://github.com/RetroShare/RetroShare.git
synced 2025-02-17 21:34:10 -05:00
Merge pull request #1914 from jolavillette/pqihandlerOptim
Pqihandler optim
This commit is contained in:
commit
44324fbc98
@ -1,5 +1,5 @@
|
|||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
* libretroshare/src/pqi: pqiservice.h *
|
* libretroshare/src/pqi: pqiservice.cc *
|
||||||
* *
|
* *
|
||||||
* libretroshare: retroshare core library *
|
* libretroshare: retroshare core library *
|
||||||
* *
|
* *
|
||||||
@ -23,6 +23,22 @@
|
|||||||
#include "util/rsdebug.h"
|
#include "util/rsdebug.h"
|
||||||
#include "util/rsstring.h"
|
#include "util/rsstring.h"
|
||||||
|
|
||||||
|
#include <sstream>
|
||||||
|
#include <sys/time.h>
|
||||||
|
static double getCurrentTS()
|
||||||
|
{
|
||||||
|
#ifndef WINDOWS_SYS
|
||||||
|
struct timeval cts_tmp;
|
||||||
|
gettimeofday(&cts_tmp, NULL);
|
||||||
|
double cts = (cts_tmp.tv_sec) + ((double) cts_tmp.tv_usec) / 1000000.0;
|
||||||
|
#else
|
||||||
|
struct _timeb timebuf;
|
||||||
|
_ftime( &timebuf);
|
||||||
|
double cts = (timebuf.time) + ((double) timebuf.millitm) / 1000.0;
|
||||||
|
#endif
|
||||||
|
return cts;
|
||||||
|
}
|
||||||
|
|
||||||
#ifdef SERVICE_DEBUG
|
#ifdef SERVICE_DEBUG
|
||||||
const int pqiservicezone = 60478;
|
const int pqiservicezone = 60478;
|
||||||
#endif
|
#endif
|
||||||
@ -44,7 +60,7 @@ bool pqiService::send(RsRawItem *item)
|
|||||||
|
|
||||||
p3ServiceServer::p3ServiceServer(pqiPublisher *pub, p3ServiceControl *ctrl) : mPublisher(pub), mServiceControl(ctrl), srvMtx("p3ServiceServer")
|
p3ServiceServer::p3ServiceServer(pqiPublisher *pub, p3ServiceControl *ctrl) : mPublisher(pub), mServiceControl(ctrl), srvMtx("p3ServiceServer")
|
||||||
{
|
{
|
||||||
RsStackMutex stack(srvMtx); /********* LOCKED *********/
|
RS_STACK_MUTEX(srvMtx); /********* LOCKED *********/
|
||||||
|
|
||||||
#ifdef SERVICE_DEBUG
|
#ifdef SERVICE_DEBUG
|
||||||
pqioutput(PQL_DEBUG_BASIC, pqiservicezone,
|
pqioutput(PQL_DEBUG_BASIC, pqiservicezone,
|
||||||
@ -56,7 +72,7 @@ p3ServiceServer::p3ServiceServer(pqiPublisher *pub, p3ServiceControl *ctrl) : mP
|
|||||||
|
|
||||||
int p3ServiceServer::addService(pqiService *ts, bool defaultOn)
|
int p3ServiceServer::addService(pqiService *ts, bool defaultOn)
|
||||||
{
|
{
|
||||||
RsStackMutex stack(srvMtx); /********* LOCKED *********/
|
RS_STACK_MUTEX(srvMtx); /********* LOCKED *********/
|
||||||
|
|
||||||
#ifdef SERVICE_DEBUG
|
#ifdef SERVICE_DEBUG
|
||||||
pqioutput(PQL_DEBUG_BASIC, pqiservicezone,
|
pqioutput(PQL_DEBUG_BASIC, pqiservicezone,
|
||||||
@ -84,7 +100,7 @@ int p3ServiceServer::addService(pqiService *ts, bool defaultOn)
|
|||||||
|
|
||||||
bool p3ServiceServer::getServiceItemNames(uint32_t service_type,std::map<uint8_t,std::string>& names)
|
bool p3ServiceServer::getServiceItemNames(uint32_t service_type,std::map<uint8_t,std::string>& names)
|
||||||
{
|
{
|
||||||
RsStackMutex stack(srvMtx); /********* LOCKED *********/
|
RS_STACK_MUTEX(srvMtx); /********* LOCKED *********/
|
||||||
|
|
||||||
std::map<uint32_t, pqiService *>::iterator it=services.find(service_type) ;
|
std::map<uint32_t, pqiService *>::iterator it=services.find(service_type) ;
|
||||||
|
|
||||||
@ -99,7 +115,7 @@ bool p3ServiceServer::getServiceItemNames(uint32_t service_type,std::map<uint8_t
|
|||||||
|
|
||||||
int p3ServiceServer::removeService(pqiService *ts)
|
int p3ServiceServer::removeService(pqiService *ts)
|
||||||
{
|
{
|
||||||
RsStackMutex stack(srvMtx); /********* LOCKED *********/
|
RS_STACK_MUTEX(srvMtx); /********* LOCKED *********/
|
||||||
|
|
||||||
#ifdef SERVICE_DEBUG
|
#ifdef SERVICE_DEBUG
|
||||||
pqioutput(PQL_DEBUG_BASIC, pqiservicezone, "p3ServiceServer::removeService()");
|
pqioutput(PQL_DEBUG_BASIC, pqiservicezone, "p3ServiceServer::removeService()");
|
||||||
@ -124,60 +140,33 @@ int p3ServiceServer::removeService(pqiService *ts)
|
|||||||
|
|
||||||
bool p3ServiceServer::recvItem(RsRawItem *item)
|
bool p3ServiceServer::recvItem(RsRawItem *item)
|
||||||
{
|
{
|
||||||
RsStackMutex stack(srvMtx); /********* LOCKED *********/
|
|
||||||
|
|
||||||
#ifdef SERVICE_DEBUG
|
|
||||||
std::cerr << "p3ServiceServer::incoming()";
|
|
||||||
std::cerr << std::endl;
|
|
||||||
|
|
||||||
{
|
|
||||||
std::string out;
|
|
||||||
rs_sprintf(out, "p3ServiceServer::incoming() PacketId: %x\nLooking for Service: %x\nItem:\n", item -> PacketId(), (item -> PacketId() & 0xffffff00));
|
|
||||||
item -> print_string(out);
|
|
||||||
std::cerr << out;
|
|
||||||
std::cerr << std::endl;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
// Packet Filtering.
|
// Packet Filtering.
|
||||||
// This doesn't need to be in Mutex.
|
// This doesn't need to be in Mutex.
|
||||||
if (!mServiceControl->checkFilter(item->PacketId() & 0xffffff00, item->PeerId()))
|
if (!mServiceControl->checkFilter(item->PacketId() & 0xffffff00, item->PeerId()))
|
||||||
{
|
{
|
||||||
#ifdef SERVICE_DEBUG
|
|
||||||
std::cerr << "p3ServiceServer::recvItem() Fails Filtering " << std::endl;
|
|
||||||
#endif
|
|
||||||
delete item;
|
delete item;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pqiService *s = NULL;
|
||||||
|
|
||||||
std::map<uint32_t, pqiService *>::iterator it;
|
// access the service map under mutex lock
|
||||||
it = services.find(item -> PacketId() & 0xffffff00);
|
{
|
||||||
|
RS_STACK_MUTEX(srvMtx);
|
||||||
|
auto it = services.find(item -> PacketId() & 0xffffff00);
|
||||||
if (it == services.end())
|
if (it == services.end())
|
||||||
{
|
{
|
||||||
#ifdef SERVICE_DEBUG
|
|
||||||
std::cerr << "p3ServiceServer::incoming() Service: No Service - deleting";
|
|
||||||
std::cerr << std::endl;
|
|
||||||
#endif
|
|
||||||
delete item;
|
delete item;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
s = it->second;
|
||||||
{
|
|
||||||
#ifdef SERVICE_DEBUG
|
|
||||||
std::cerr << "p3ServiceServer::incoming() Sending to : " << (void *) it -> second;
|
|
||||||
std::cerr << std::endl;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
return (it->second) -> recv(item);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
delete item;
|
// then call recv off mutex
|
||||||
return false;
|
bool result = s->recv(item);
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
bool p3ServiceServer::sendItem(RsRawItem *item)
|
bool p3ServiceServer::sendItem(RsRawItem *item)
|
||||||
{
|
{
|
||||||
#ifdef SERVICE_DEBUG
|
#ifdef SERVICE_DEBUG
|
||||||
@ -204,40 +193,27 @@ bool p3ServiceServer::sendItem(RsRawItem *item)
|
|||||||
}
|
}
|
||||||
|
|
||||||
mPublisher->sendItem(item);
|
mPublisher->sendItem(item);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
int p3ServiceServer::tick()
|
int p3ServiceServer::tick()
|
||||||
{
|
{
|
||||||
|
|
||||||
mServiceControl->tick();
|
mServiceControl->tick();
|
||||||
|
|
||||||
RsStackMutex stack(srvMtx); /********* LOCKED *********/
|
// make a copy of the service map
|
||||||
|
std::map<uint32_t,pqiService *> local_map;
|
||||||
#ifdef SERVICE_DEBUG
|
|
||||||
pqioutput(PQL_DEBUG_ALL, pqiservicezone,
|
|
||||||
"p3ServiceServer::tick()");
|
|
||||||
#endif
|
|
||||||
|
|
||||||
std::map<uint32_t, pqiService *>::iterator it;
|
|
||||||
|
|
||||||
// from the beginning to where we started.
|
|
||||||
for(it = services.begin();it != services.end(); ++it)
|
|
||||||
{
|
{
|
||||||
|
RS_STACK_MUTEX(srvMtx);
|
||||||
|
local_map=services;
|
||||||
|
}
|
||||||
|
|
||||||
#ifdef SERVICE_DEBUG
|
// tick all services off mutex
|
||||||
std::string out;
|
for(auto it(local_map.begin());it!=local_map.end();++it)
|
||||||
rs_sprintf(out, "p3ServiceServer::service id: %u -> Service: %p", it -> first, it -> second);
|
{
|
||||||
pqioutput(PQL_DEBUG_ALL, pqiservicezone, out);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
// now we should actually tick the service.
|
|
||||||
(it->second)->tick();
|
(it->second)->tick();
|
||||||
}
|
}
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -372,7 +372,9 @@ int pqissl::status()
|
|||||||
// tick......
|
// tick......
|
||||||
int pqissl::tick()
|
int pqissl::tick()
|
||||||
{
|
{
|
||||||
RsStackMutex stack(mSslMtx); /**** LOCKED MUTEX ****/
|
// there is no reason to lock pqissl mutex now
|
||||||
|
// we will lock the mutex later if we actually need to call to ConnectAttempt
|
||||||
|
// RsStackMutex stack(mSslMtx); /**** LOCKED MUTEX ****/
|
||||||
|
|
||||||
// pqistreamer::tick();
|
// pqistreamer::tick();
|
||||||
|
|
||||||
@ -385,7 +387,8 @@ int pqissl::tick()
|
|||||||
#ifdef PQISSL_LOG_DEBUG
|
#ifdef PQISSL_LOG_DEBUG
|
||||||
rslog(RSL_DEBUG_BASIC, pqisslzone, "pqissl::tick() Continuing Connection Attempt!");
|
rslog(RSL_DEBUG_BASIC, pqisslzone, "pqissl::tick() Continuing Connection Attempt!");
|
||||||
#endif
|
#endif
|
||||||
|
// now lock pqissl mutex, that will take up to 10 ms
|
||||||
|
RsStackMutex stack(mSslMtx); /**** LOCKED MUTEX ****/
|
||||||
ConnectAttempt();
|
ConnectAttempt();
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
* libretroshare/src/pqi: pqistreamer.h *
|
* libretroshare/src/pqi: pqistreamer.cc *
|
||||||
* *
|
* *
|
||||||
* libretroshare: retroshare core library *
|
* libretroshare: retroshare core library *
|
||||||
* *
|
* *
|
||||||
@ -116,6 +116,7 @@ pqistreamer::pqistreamer(RsSerialiser *rss, const RsPeerId& id, BinInterface *bi
|
|||||||
mAvgLastUpdate = mCurrSentTS = mCurrReadTS = getCurrentTS();
|
mAvgLastUpdate = mCurrSentTS = mCurrReadTS = getCurrentTS();
|
||||||
|
|
||||||
mIncomingSize = 0 ;
|
mIncomingSize = 0 ;
|
||||||
|
mIncomingSize_bytes = 0;
|
||||||
|
|
||||||
mStatisticsTimeStamp = 0 ;
|
mStatisticsTimeStamp = 0 ;
|
||||||
/* allocated once */
|
/* allocated once */
|
||||||
@ -159,7 +160,7 @@ pqistreamer::~pqistreamer()
|
|||||||
if (mRsSerialiser)
|
if (mRsSerialiser)
|
||||||
delete mRsSerialiser;
|
delete mRsSerialiser;
|
||||||
|
|
||||||
free_pend_locked() ;
|
free_pend() ;
|
||||||
|
|
||||||
// clean up incoming.
|
// clean up incoming.
|
||||||
while (!mIncoming.empty())
|
while (!mIncoming.empty())
|
||||||
@ -177,6 +178,7 @@ pqistreamer::~pqistreamer()
|
|||||||
|
|
||||||
|
|
||||||
// Get/Send Items.
|
// Get/Send Items.
|
||||||
|
// This is the entry poing for methods willing to send items through our out queue
|
||||||
int pqistreamer::SendItem(RsItem *si,uint32_t& out_size)
|
int pqistreamer::SendItem(RsItem *si,uint32_t& out_size)
|
||||||
{
|
{
|
||||||
#ifdef RSITEM_DEBUG
|
#ifdef RSITEM_DEBUG
|
||||||
@ -199,18 +201,30 @@ RsItem *pqistreamer::GetItem()
|
|||||||
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::GetItem()");
|
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::GetItem()");
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
|
|
||||||
|
|
||||||
if(mIncoming.empty())
|
if(mIncoming.empty())
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
||||||
RsItem *osr = mIncoming.front() ;
|
RsItem *osr = mIncoming.front() ;
|
||||||
mIncoming.pop_front() ;
|
mIncoming.pop_front() ;
|
||||||
--mIncomingSize;
|
--mIncomingSize;
|
||||||
|
// for future use
|
||||||
|
// mIncomingSize_bytes -=
|
||||||
|
|
||||||
return osr;
|
return osr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
float pqistreamer::getMaxRate(bool b)
|
||||||
|
{
|
||||||
|
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
|
||||||
|
return getMaxRate_locked(b);
|
||||||
|
}
|
||||||
|
|
||||||
|
float pqistreamer::getMaxRate_locked(bool b)
|
||||||
|
{
|
||||||
|
return RateInterface::getMaxRate(b) ;
|
||||||
|
}
|
||||||
|
|
||||||
float pqistreamer::getRate(bool b)
|
float pqistreamer::getRate(bool b)
|
||||||
{
|
{
|
||||||
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
|
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
|
||||||
@ -220,25 +234,27 @@ float pqistreamer::getRate(bool b)
|
|||||||
void pqistreamer::setMaxRate(bool b,float f)
|
void pqistreamer::setMaxRate(bool b,float f)
|
||||||
{
|
{
|
||||||
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
|
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
|
||||||
|
setMaxRate_locked(b,f);
|
||||||
|
}
|
||||||
|
|
||||||
|
void pqistreamer::setMaxRate_locked(bool b,float f)
|
||||||
|
{
|
||||||
RateInterface::setMaxRate(b,f) ;
|
RateInterface::setMaxRate(b,f) ;
|
||||||
}
|
}
|
||||||
|
|
||||||
void pqistreamer::setRate(bool b,float f)
|
void pqistreamer::setRate(bool b,float f)
|
||||||
{
|
{
|
||||||
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
|
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
|
||||||
RateInterface::setRate(b,f) ;
|
RateInterface::setRate(b,f) ;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void pqistreamer::updateRates()
|
void pqistreamer::updateRates()
|
||||||
{
|
{
|
||||||
// update rates both ways.
|
// update actual rates both ways.
|
||||||
|
|
||||||
double t = getCurrentTS(); // get current timestamp.
|
double t = getCurrentTS(); // get current timestamp.
|
||||||
double diff ;
|
double diff = t - mAvgLastUpdate;
|
||||||
|
|
||||||
{
|
|
||||||
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
|
|
||||||
diff = t - mAvgLastUpdate ;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (diff > PQISTREAM_AVG_PERIOD)
|
if (diff > PQISTREAM_AVG_PERIOD)
|
||||||
{
|
{
|
||||||
@ -263,10 +279,11 @@ void pqistreamer::updateRates()
|
|||||||
setRate(false, 0);
|
setRate(false, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
|
||||||
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
|
|
||||||
mAvgLastUpdate = t;
|
mAvgLastUpdate = t;
|
||||||
mAvgReadCount = 0;
|
mAvgReadCount = 0;
|
||||||
|
|
||||||
|
{
|
||||||
|
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
|
||||||
mAvgSentCount = 0;
|
mAvgSentCount = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -277,7 +294,7 @@ int pqistreamer::tick_bio()
|
|||||||
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
|
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
|
||||||
mBio->tick();
|
mBio->tick();
|
||||||
|
|
||||||
/* short circuit everything is bio isn't active */
|
/* short circuit everything if bio isn't active */
|
||||||
if (!(mBio->isactive()))
|
if (!(mBio->isactive()))
|
||||||
{
|
{
|
||||||
return 0;
|
return 0;
|
||||||
@ -285,36 +302,36 @@ int pqistreamer::tick_bio()
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int pqistreamer::tick_recv(uint32_t timeout)
|
int pqistreamer::tick_recv(uint32_t timeout)
|
||||||
{
|
{
|
||||||
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
|
// Apart from a few exceptions that are atomic (mLastIncomingTs, mIncomingSize), only this pqi thread reads/writes mIncoming queue and related counters.
|
||||||
|
// The lock of pqistreamer mutex is thus not needed here.
|
||||||
|
// The mutex lock is still needed before calling locked_addTrafficClue because this method is also used by the thread pushing packets in mOutPkts.
|
||||||
|
// Locks around rates are provided internally.
|
||||||
|
|
||||||
if (mBio->moretoread(timeout))
|
if (mBio->moretoread(timeout))
|
||||||
{
|
{
|
||||||
handleincoming_locked();
|
handleincoming();
|
||||||
}
|
}
|
||||||
if(!(mBio->isactive()))
|
if(!(mBio->isactive()))
|
||||||
{
|
{
|
||||||
free_pend_locked();
|
free_pend();
|
||||||
}
|
}
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int pqistreamer::tick_send(uint32_t timeout)
|
int pqistreamer::tick_send(uint32_t timeout)
|
||||||
{
|
{
|
||||||
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
|
/* short circuit everything if bio isn't active */
|
||||||
|
|
||||||
/* short circuit everything is bio isn't active */
|
|
||||||
if (!(mBio->isactive()))
|
if (!(mBio->isactive()))
|
||||||
{
|
{
|
||||||
free_pend_locked();
|
free_pend();
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mBio->cansend(timeout))
|
if (mBio->cansend(timeout))
|
||||||
{
|
{
|
||||||
|
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
|
||||||
handleoutgoing_locked();
|
handleoutgoing_locked();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -340,12 +357,11 @@ int pqistreamer::status()
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// this method is overloaded by pqiqosstreamer
|
||||||
void pqistreamer::locked_storeInOutputQueue(void *ptr,int,int)
|
void pqistreamer::locked_storeInOutputQueue(void *ptr,int,int)
|
||||||
{
|
{
|
||||||
mOutPkts.push_back(ptr);
|
mOutPkts.push_back(ptr);
|
||||||
}
|
}
|
||||||
//
|
|
||||||
/**************** HANDLE OUTGOING TRANSLATION + TRANSMISSION ******/
|
|
||||||
|
|
||||||
int pqistreamer::queue_outpqi_locked(RsItem *pqi,uint32_t& pktsize)
|
int pqistreamer::queue_outpqi_locked(RsItem *pqi,uint32_t& pktsize)
|
||||||
{
|
{
|
||||||
@ -354,7 +370,6 @@ int pqistreamer::queue_outpqi_locked(RsItem *pqi,uint32_t& pktsize)
|
|||||||
std::cerr << "pqistreamer::queue_outpqi() called." << std::endl;
|
std::cerr << "pqistreamer::queue_outpqi() called." << std::endl;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
||||||
/* decide which type of packet it is */
|
/* decide which type of packet it is */
|
||||||
|
|
||||||
pktsize = mRsSerialiser->size(pqi);
|
pktsize = mRsSerialiser->size(pqi);
|
||||||
@ -363,7 +378,6 @@ int pqistreamer::queue_outpqi_locked(RsItem *pqi,uint32_t& pktsize)
|
|||||||
if(ptr == NULL)
|
if(ptr == NULL)
|
||||||
return 0 ;
|
return 0 ;
|
||||||
|
|
||||||
|
|
||||||
#ifdef DEBUG_PQISTREAMER
|
#ifdef DEBUG_PQISTREAMER
|
||||||
std::cerr << "pqistreamer::queue_outpqi() serializing packet with packet size : " << pktsize << std::endl;
|
std::cerr << "pqistreamer::queue_outpqi() serializing packet with packet size : " << pktsize << std::endl;
|
||||||
#endif
|
#endif
|
||||||
@ -403,26 +417,30 @@ int pqistreamer::queue_outpqi_locked(RsItem *pqi,uint32_t& pktsize)
|
|||||||
return 1; // keep error internal.
|
return 1; // keep error internal.
|
||||||
}
|
}
|
||||||
|
|
||||||
int pqistreamer::handleincomingitem_locked(RsItem *pqi,int len)
|
int pqistreamer::handleincomingitem(RsItem *pqi,int len)
|
||||||
{
|
{
|
||||||
|
|
||||||
#ifdef DEBUG_PQISTREAMER
|
#ifdef DEBUG_PQISTREAMER
|
||||||
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::handleincomingitem_locked()");
|
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::handleincomingitem()");
|
||||||
#endif
|
#endif
|
||||||
// timestamp last received packet.
|
// timestamp last received packet.
|
||||||
mLastIncomingTs = time(NULL);
|
mLastIncomingTs = time(NULL);
|
||||||
|
|
||||||
// Use overloaded Contact function
|
// Use overloaded Contact function
|
||||||
pqi -> PeerId(PeerId());
|
pqi -> PeerId(PeerId());
|
||||||
|
|
||||||
mIncoming.push_back(pqi);
|
mIncoming.push_back(pqi);
|
||||||
++mIncomingSize;
|
++mIncomingSize;
|
||||||
|
// for future use
|
||||||
|
// mIncomingSize_bytes += len;
|
||||||
|
|
||||||
/*******************************************************************************************/
|
/*******************************************************************************************/
|
||||||
// keep info for stats for a while. Only keep the items for the last two seconds. sec n is ongoing and second n-1
|
// keep info for stats for a while. Only keep the items for the last two seconds. sec n is ongoing and second n-1
|
||||||
// is a full statistics chunk that can be used in the GUI
|
// is a full statistics chunk that can be used in the GUI
|
||||||
|
{
|
||||||
|
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
|
||||||
locked_addTrafficClue(pqi,len,mCurrentStatsChunk_In) ;
|
locked_addTrafficClue(pqi,len,mCurrentStatsChunk_In) ;
|
||||||
|
}
|
||||||
/*******************************************************************************************/
|
/*******************************************************************************************/
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
@ -456,8 +474,8 @@ void pqistreamer::locked_addTrafficClue(const RsItem *pqi,uint32_t pktsize,std::
|
|||||||
|
|
||||||
rstime_t pqistreamer::getLastIncomingTS()
|
rstime_t pqistreamer::getLastIncomingTS()
|
||||||
{
|
{
|
||||||
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
|
// This is the only case where another thread (rs main for pqiperson) will access our data
|
||||||
|
// Still a mutex lock is not needed because the operation is atomic
|
||||||
return mLastIncomingTs;
|
return mLastIncomingTs;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -693,23 +711,23 @@ int pqistreamer::handleoutgoing_locked()
|
|||||||
|
|
||||||
/* Handles reading from input stream.
|
/* Handles reading from input stream.
|
||||||
*/
|
*/
|
||||||
int pqistreamer::handleincoming_locked()
|
int pqistreamer::handleincoming()
|
||||||
{
|
{
|
||||||
int readbytes = 0;
|
int readbytes = 0;
|
||||||
static const int max_failed_read_attempts = 2000 ;
|
static const int max_failed_read_attempts = 2000 ;
|
||||||
|
|
||||||
#ifdef DEBUG_PQISTREAMER
|
#ifdef DEBUG_PQISTREAMER
|
||||||
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::handleincoming_locked()");
|
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::handleincoming()");
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
if(!(mBio->isactive()))
|
if(!(mBio->isactive()))
|
||||||
{
|
{
|
||||||
mReading_state = reading_state_initial ;
|
mReading_state = reading_state_initial ;
|
||||||
free_pend_locked();
|
free_pend();
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
allocate_rpend_locked();
|
allocate_rpend();
|
||||||
|
|
||||||
// enough space to read any packet.
|
// enough space to read any packet.
|
||||||
uint32_t maxlen = mPkt_rpend_size;
|
uint32_t maxlen = mPkt_rpend_size;
|
||||||
@ -718,7 +736,7 @@ int pqistreamer::handleincoming_locked()
|
|||||||
// initial read size: basic packet.
|
// initial read size: basic packet.
|
||||||
int blen = getRsPktBaseSize(); // this is valid for both packet slices and normal un-sliced packets (same header size)
|
int blen = getRsPktBaseSize(); // this is valid for both packet slices and normal un-sliced packets (same header size)
|
||||||
|
|
||||||
int maxin = inAllowedBytes_locked();
|
int maxin = inAllowedBytes();
|
||||||
|
|
||||||
#ifdef DEBUG_PQISTREAMER
|
#ifdef DEBUG_PQISTREAMER
|
||||||
std::cerr << "[" << (void*)pthread_self() << "] " << "reading state = " << mReading_state << std::endl ;
|
std::cerr << "[" << (void*)pthread_self() << "] " << "reading state = " << mReading_state << std::endl ;
|
||||||
@ -967,19 +985,19 @@ continue_packet:
|
|||||||
std::cerr << "Inputing partial packet " << RsUtil::BinToHex((char*)block,8) << std::endl;
|
std::cerr << "Inputing partial packet " << RsUtil::BinToHex((char*)block,8) << std::endl;
|
||||||
#endif
|
#endif
|
||||||
uint32_t packet_length = 0 ;
|
uint32_t packet_length = 0 ;
|
||||||
pkt = addPartialPacket_locked(block,pktlen,slice_packet_id,is_packet_starting,is_packet_ending,packet_length) ;
|
pkt = addPartialPacket(block,pktlen,slice_packet_id,is_packet_starting,is_packet_ending,packet_length) ;
|
||||||
|
|
||||||
pktlen = packet_length ;
|
pktlen = packet_length ;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
pkt = mRsSerialiser->deserialise(block, &pktlen);
|
pkt = mRsSerialiser->deserialise(block, &pktlen);
|
||||||
|
|
||||||
if ((pkt != NULL) && (0 < handleincomingitem_locked(pkt,pktlen)))
|
if ((pkt != NULL) && (0 < handleincomingitem(pkt,pktlen)))
|
||||||
{
|
{
|
||||||
#ifdef DEBUG_PQISTREAMER
|
#ifdef DEBUG_PQISTREAMER
|
||||||
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, "Successfully Read a Packet!");
|
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, "Successfully Read a Packet!");
|
||||||
#endif
|
#endif
|
||||||
inReadBytes_locked(pktlen); // only count deserialised packets, because that's what is actually been transfered.
|
inReadBytes(pktlen); // only count deserialised packets, because that's what is actually been transfered.
|
||||||
}
|
}
|
||||||
else if (!is_partial_packet)
|
else if (!is_partial_packet)
|
||||||
{
|
{
|
||||||
@ -1012,7 +1030,7 @@ continue_packet:
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
RsItem *pqistreamer::addPartialPacket_locked(const void *block, uint32_t len, uint32_t slice_packet_id, bool is_packet_starting, bool is_packet_ending, uint32_t &total_len)
|
RsItem *pqistreamer::addPartialPacket(const void *block, uint32_t len, uint32_t slice_packet_id, bool is_packet_starting, bool is_packet_ending, uint32_t &total_len)
|
||||||
{
|
{
|
||||||
#ifdef DEBUG_PACKET_SLICING
|
#ifdef DEBUG_PACKET_SLICING
|
||||||
std::cerr << "Receiving partial packet. size=" << len << ", ID=" << std::hex << slice_packet_id << std::dec << ", starting:" << is_packet_starting << ", ending:" << is_packet_ending ;
|
std::cerr << "Receiving partial packet. size=" << len << ", ID=" << std::hex << slice_packet_id << std::dec << ", starting:" << is_packet_starting << ", ending:" << is_packet_ending ;
|
||||||
@ -1134,7 +1152,7 @@ int pqistreamer::outAllowedBytes_locked()
|
|||||||
// low pass filter on mAvgDtOut
|
// low pass filter on mAvgDtOut
|
||||||
mAvgDtOut = PQISTREAM_AVG_DT_FRAC * mAvgDtOut + (1 - PQISTREAM_AVG_DT_FRAC) * dt;
|
mAvgDtOut = PQISTREAM_AVG_DT_FRAC * mAvgDtOut + (1 - PQISTREAM_AVG_DT_FRAC) * dt;
|
||||||
|
|
||||||
double maxout = getMaxRate(false) * 1024.0;
|
double maxout = getMaxRate_locked(false) * 1024.0;
|
||||||
|
|
||||||
// this is used to take into account a possible excess of data sent during the previous round
|
// this is used to take into account a possible excess of data sent during the previous round
|
||||||
mCurrSent -= int(dt * maxout);
|
mCurrSent -= int(dt * maxout);
|
||||||
@ -1156,7 +1174,7 @@ int pqistreamer::outAllowedBytes_locked()
|
|||||||
return quota;
|
return quota;
|
||||||
}
|
}
|
||||||
|
|
||||||
int pqistreamer::inAllowedBytes_locked()
|
int pqistreamer::inAllowedBytes()
|
||||||
{
|
{
|
||||||
double t = getCurrentTS(); // in sec, with high accuracy
|
double t = getCurrentTS(); // in sec, with high accuracy
|
||||||
|
|
||||||
@ -1194,7 +1212,7 @@ int pqistreamer::inAllowedBytes_locked()
|
|||||||
|
|
||||||
#ifdef DEBUG_PQISTREAMER
|
#ifdef DEBUG_PQISTREAMER
|
||||||
uint64_t t_now = 1000 * getCurrentTS();
|
uint64_t t_now = 1000 * getCurrentTS();
|
||||||
std::cerr << std::dec << t_now << " DEBUG_PQISTREAMER pqistreamer::inAllowedBytes_locked PeerId " << this->PeerId().toStdString() << " dt " << (int)(1000 * dt) << "ms, mAvgDtIn " << (int)(1000 * mAvgDtIn) << "ms, maxin " << (int)(maxin) << " bytes/s, mCurrRead " << mCurrRead << " bytes, quota " << (int)(quota) << " bytes" << std::endl;
|
std::cerr << std::dec << t_now << " DEBUG_PQISTREAMER pqistreamer::inAllowedBytes PeerId " << this->PeerId().toStdString() << " dt " << (int)(1000 * dt) << "ms, mAvgDtIn " << (int)(1000 * mAvgDtIn) << "ms, maxin " << (int)(maxin) << " bytes/s, mCurrRead " << mCurrRead << " bytes, quota " << (int)(quota) << " bytes" << std::endl;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
return quota;
|
return quota;
|
||||||
@ -1231,7 +1249,7 @@ void pqistreamer::outSentBytes_locked(uint32_t outb)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
void pqistreamer::inReadBytes_locked(uint32_t inb)
|
void pqistreamer::inReadBytes(uint32_t inb)
|
||||||
{
|
{
|
||||||
#ifdef DEBUG_PQISTREAMER
|
#ifdef DEBUG_PQISTREAMER
|
||||||
{
|
{
|
||||||
@ -1248,7 +1266,7 @@ void pqistreamer::inReadBytes_locked(uint32_t inb)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
void pqistreamer::allocate_rpend_locked()
|
void pqistreamer::allocate_rpend()
|
||||||
{
|
{
|
||||||
if(mPkt_rpending)
|
if(mPkt_rpending)
|
||||||
return;
|
return;
|
||||||
@ -1271,17 +1289,17 @@ int pqistreamer::reset()
|
|||||||
#ifdef DEBUG_PQISTREAMER
|
#ifdef DEBUG_PQISTREAMER
|
||||||
std::cerr << "pqistreamer::reset()" << std::endl;
|
std::cerr << "pqistreamer::reset()" << std::endl;
|
||||||
#endif
|
#endif
|
||||||
free_pend_locked();
|
free_pend();
|
||||||
|
|
||||||
return 1 ;
|
return 1 ;
|
||||||
}
|
}
|
||||||
|
|
||||||
void pqistreamer::free_pend_locked()
|
void pqistreamer::free_pend()
|
||||||
{
|
{
|
||||||
if(mPkt_rpending)
|
if(mPkt_rpending)
|
||||||
{
|
{
|
||||||
#ifdef DEBUG_PQISTREAMER
|
#ifdef DEBUG_PQISTREAMER
|
||||||
std::cerr << "pqistreamer::free_pend_locked(): pending input packet buffer" << std::endl;
|
std::cerr << "pqistreamer::free_pend(): pending input packet buffer" << std::endl;
|
||||||
#endif
|
#endif
|
||||||
free(mPkt_rpending);
|
free(mPkt_rpending);
|
||||||
mPkt_rpending = 0;
|
mPkt_rpending = 0;
|
||||||
@ -1291,7 +1309,7 @@ void pqistreamer::free_pend_locked()
|
|||||||
if (mPkt_wpending)
|
if (mPkt_wpending)
|
||||||
{
|
{
|
||||||
#ifdef DEBUG_PQISTREAMER
|
#ifdef DEBUG_PQISTREAMER
|
||||||
std::cerr << "pqistreamer::free_pend_locked(): pending output packet buffer" << std::endl;
|
std::cerr << "pqistreamer::free_pend(): pending output packet buffer" << std::endl;
|
||||||
#endif
|
#endif
|
||||||
free(mPkt_wpending);
|
free(mPkt_wpending);
|
||||||
mPkt_wpending = NULL;
|
mPkt_wpending = NULL;
|
||||||
@ -1300,7 +1318,7 @@ void pqistreamer::free_pend_locked()
|
|||||||
|
|
||||||
#ifdef DEBUG_PQISTREAMER
|
#ifdef DEBUG_PQISTREAMER
|
||||||
if(!mPartialPackets.empty())
|
if(!mPartialPackets.empty())
|
||||||
std::cerr << "pqistreamer::free_pend_locked(): " << mPartialPackets.size() << " pending input partial packets" << std::endl;
|
std::cerr << "pqistreamer::free_pend(): " << mPartialPackets.size() << " pending input partial packets" << std::endl;
|
||||||
#endif
|
#endif
|
||||||
// also delete any incoming partial packet
|
// also delete any incoming partial packet
|
||||||
for(std::map<uint32_t,PartialPacketRecord>::iterator it(mPartialPackets.begin());it!=mPartialPackets.end();++it)
|
for(std::map<uint32_t,PartialPacketRecord>::iterator it(mPartialPackets.begin());it!=mPartialPackets.end();++it)
|
||||||
@ -1318,26 +1336,47 @@ int pqistreamer::gatherStatistics(std::list<RSTrafficClue>& outqueue_lst,std
|
|||||||
|
|
||||||
return locked_gatherStatistics(outqueue_lst,inqueue_lst);
|
return locked_gatherStatistics(outqueue_lst,inqueue_lst);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// this method is overloaded by pqiqosstreamer
|
||||||
int pqistreamer::getQueueSize(bool in)
|
int pqistreamer::getQueueSize(bool in)
|
||||||
{
|
{
|
||||||
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
|
|
||||||
|
|
||||||
if (in)
|
if (in)
|
||||||
|
// no mutex is needed here because this is atomic
|
||||||
return mIncomingSize;
|
return mIncomingSize;
|
||||||
else
|
else
|
||||||
|
{
|
||||||
|
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
|
||||||
return locked_out_queue_size();
|
return locked_out_queue_size();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int pqistreamer::getQueueSize_bytes(bool in)
|
||||||
|
{
|
||||||
|
if (in)
|
||||||
|
// no mutex is needed here because this is atomic
|
||||||
|
// for future use, mIncomingSize_bytes is not updated yet
|
||||||
|
return mIncomingSize_bytes;
|
||||||
|
else
|
||||||
|
{
|
||||||
|
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
|
||||||
|
return locked_compute_out_pkt_size();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void pqistreamer::getRates(RsBwRates &rates)
|
void pqistreamer::getRates(RsBwRates &rates)
|
||||||
{
|
{
|
||||||
RateInterface::getRates(rates);
|
RateInterface::getRates(rates);
|
||||||
|
|
||||||
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
|
// no mutex is needed here because this is atomic
|
||||||
|
|
||||||
rates.mQueueIn = mIncomingSize;
|
rates.mQueueIn = mIncomingSize;
|
||||||
|
|
||||||
|
{
|
||||||
|
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
|
||||||
rates.mQueueOut = locked_out_queue_size();
|
rates.mQueueOut = locked_out_queue_size();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// this method is overloaded by pqiqosstreamer
|
||||||
int pqistreamer::locked_out_queue_size() const
|
int pqistreamer::locked_out_queue_size() const
|
||||||
{
|
{
|
||||||
// Warning: because out_pkt is a list, calling size
|
// Warning: because out_pkt is a list, calling size
|
||||||
@ -1347,6 +1386,7 @@ int pqistreamer::locked_out_queue_size() const
|
|||||||
return mOutPkts.size() ;
|
return mOutPkts.size() ;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// this method is overloaded by pqiqosstreamer
|
||||||
void pqistreamer::locked_clear_out_queue()
|
void pqistreamer::locked_clear_out_queue()
|
||||||
{
|
{
|
||||||
for(std::list<void*>::iterator it = mOutPkts.begin(); it != mOutPkts.end(); )
|
for(std::list<void*>::iterator it = mOutPkts.begin(); it != mOutPkts.end(); )
|
||||||
@ -1361,6 +1401,7 @@ void pqistreamer::locked_clear_out_queue()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// this method is overloaded by pqiqosstreamer
|
||||||
int pqistreamer::locked_compute_out_pkt_size() const
|
int pqistreamer::locked_compute_out_pkt_size() const
|
||||||
{
|
{
|
||||||
int total = 0 ;
|
int total = 0 ;
|
||||||
@ -1379,6 +1420,7 @@ int pqistreamer::locked_gatherStatistics(std::list<RSTrafficClue>& out_lst,std::
|
|||||||
return 1 ;
|
return 1 ;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// this method is overloaded by pqiqosstreamer
|
||||||
void *pqistreamer::locked_pop_out_data(uint32_t /*max_slice_size*/, uint32_t &size, bool &starts, bool &ends, uint32_t &packet_id)
|
void *pqistreamer::locked_pop_out_data(uint32_t /*max_slice_size*/, uint32_t &size, bool &starts, bool &ends, uint32_t &packet_id)
|
||||||
{
|
{
|
||||||
size = 0 ;
|
size = 0 ;
|
||||||
@ -1400,4 +1442,3 @@ void *pqistreamer::locked_pop_out_data(uint32_t /*max_slice_size*/, uint32_t &si
|
|||||||
return res ;
|
return res ;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -68,12 +68,17 @@ class pqistreamer: public PQInterface
|
|||||||
rstime_t getLastIncomingTS(); // Time of last data packet, for checking a connection is alive.
|
rstime_t getLastIncomingTS(); // Time of last data packet, for checking a connection is alive.
|
||||||
virtual void getRates(RsBwRates &rates);
|
virtual void getRates(RsBwRates &rates);
|
||||||
virtual int getQueueSize(bool in); // extracting data.
|
virtual int getQueueSize(bool in); // extracting data.
|
||||||
|
virtual int getQueueSize_bytes(bool in); // size of incoming queue in bytes
|
||||||
virtual int gatherStatistics(std::list<RSTrafficClue>& outqueue_stats,std::list<RSTrafficClue>& inqueue_stats); // extracting data.
|
virtual int gatherStatistics(std::list<RSTrafficClue>& outqueue_stats,std::list<RSTrafficClue>& inqueue_stats); // extracting data.
|
||||||
|
|
||||||
// mutex protected versions of RateInterface calls.
|
// mutex protected versions of RateInterface calls.
|
||||||
virtual void setRate(bool b,float f) ;
|
virtual void setRate(bool b,float f) ;
|
||||||
virtual void setMaxRate(bool b,float f) ;
|
virtual void setMaxRate(bool b,float f) ;
|
||||||
|
virtual void setMaxRate_locked(bool b,float f) ;
|
||||||
|
|
||||||
virtual float getRate(bool b) ;
|
virtual float getRate(bool b) ;
|
||||||
|
virtual float getMaxRate(bool b) ;
|
||||||
|
virtual float getMaxRate_locked(bool b);
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
virtual int reset() ;
|
virtual int reset() ;
|
||||||
@ -104,12 +109,12 @@ class pqistreamer: public PQInterface
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
int queue_outpqi_locked(RsItem *i,uint32_t& serialized_size);
|
int queue_outpqi_locked(RsItem *i,uint32_t& serialized_size);
|
||||||
int handleincomingitem_locked(RsItem *i, int len);
|
int handleincomingitem(RsItem *i, int len);
|
||||||
|
|
||||||
// ticked regularly (manages out queues and sending
|
// ticked regularly (manages out queues and sending
|
||||||
// via above interfaces.
|
// via above interfaces.
|
||||||
virtual int handleoutgoing_locked();
|
virtual int handleoutgoing_locked();
|
||||||
virtual int handleincoming_locked();
|
virtual int handleincoming();
|
||||||
|
|
||||||
// Bandwidth/Streaming Management.
|
// Bandwidth/Streaming Management.
|
||||||
float outTimeSlice_locked();
|
float outTimeSlice_locked();
|
||||||
@ -117,11 +122,11 @@ class pqistreamer: public PQInterface
|
|||||||
int outAllowedBytes_locked();
|
int outAllowedBytes_locked();
|
||||||
void outSentBytes_locked(uint32_t );
|
void outSentBytes_locked(uint32_t );
|
||||||
|
|
||||||
int inAllowedBytes_locked();
|
int inAllowedBytes();
|
||||||
void inReadBytes_locked(uint32_t );
|
void inReadBytes(uint32_t );
|
||||||
|
|
||||||
// cleans up everything that's pending / half finished.
|
// cleans up everything that's pending / half finished.
|
||||||
void free_pend_locked();
|
void free_pend();
|
||||||
|
|
||||||
// RsSerialiser - determines which packets can be serialised.
|
// RsSerialiser - determines which packets can be serialised.
|
||||||
RsSerialiser *mRsSerialiser;
|
RsSerialiser *mRsSerialiser;
|
||||||
@ -129,13 +134,12 @@ class pqistreamer: public PQInterface
|
|||||||
void *mPkt_wpending; // storage for pending packet to write.
|
void *mPkt_wpending; // storage for pending packet to write.
|
||||||
uint32_t mPkt_wpending_size; // ... and its size.
|
uint32_t mPkt_wpending_size; // ... and its size.
|
||||||
|
|
||||||
void allocate_rpend_locked(); // use these two functions to allocate/free the buffer below
|
void allocate_rpend(); // use these two functions to allocate/free the buffer below
|
||||||
|
|
||||||
int mPkt_rpend_size; // size of pkt_rpending.
|
int mPkt_rpend_size; // size of pkt_rpending.
|
||||||
void *mPkt_rpending; // storage for read in pending packets.
|
void *mPkt_rpending; // storage for read in pending packets.
|
||||||
|
|
||||||
enum {reading_state_packet_started=1,
|
enum {reading_state_packet_started=1, reading_state_initial=0 } ;
|
||||||
reading_state_initial=0 } ;
|
|
||||||
|
|
||||||
int mReading_state ;
|
int mReading_state ;
|
||||||
int mFailed_read_attempts ;
|
int mFailed_read_attempts ;
|
||||||
@ -145,6 +149,7 @@ class pqistreamer: public PQInterface
|
|||||||
std::list<RsItem *> mIncoming;
|
std::list<RsItem *> mIncoming;
|
||||||
|
|
||||||
uint32_t mIncomingSize; // size of mIncoming. To avoid calling linear cost std::list::size()
|
uint32_t mIncomingSize; // size of mIncoming. To avoid calling linear cost std::list::size()
|
||||||
|
uint32_t mIncomingSize_bytes; // size of Incoming in btyes
|
||||||
|
|
||||||
// data for network stats.
|
// data for network stats.
|
||||||
int mTotalRead;
|
int mTotalRead;
|
||||||
@ -177,7 +182,7 @@ class pqistreamer: public PQInterface
|
|||||||
bool mAcceptsPacketSlicing ;
|
bool mAcceptsPacketSlicing ;
|
||||||
rstime_t mLastSentPacketSlicingProbe ;
|
rstime_t mLastSentPacketSlicingProbe ;
|
||||||
void locked_addTrafficClue(const RsItem *pqi, uint32_t pktsize, std::list<RSTrafficClue> &lst);
|
void locked_addTrafficClue(const RsItem *pqi, uint32_t pktsize, std::list<RSTrafficClue> &lst);
|
||||||
RsItem *addPartialPacket_locked(const void *block, uint32_t len, uint32_t slice_packet_id,bool packet_starting,bool packet_ending,uint32_t& total_len);
|
RsItem *addPartialPacket(const void *block, uint32_t len, uint32_t slice_packet_id,bool packet_starting,bool packet_ending,uint32_t& total_len);
|
||||||
|
|
||||||
std::map<uint32_t,PartialPacketRecord> mPartialPackets ;
|
std::map<uint32_t,PartialPacketRecord> mPartialPackets ;
|
||||||
};
|
};
|
||||||
|
@ -23,8 +23,8 @@
|
|||||||
#include "pqi/pqithreadstreamer.h"
|
#include "pqi/pqithreadstreamer.h"
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
|
|
||||||
#define DEFAULT_STREAMER_TIMEOUT 10000 // 10 ms.
|
#define DEFAULT_STREAMER_TIMEOUT 10000 // 10 ms
|
||||||
#define DEFAULT_STREAMER_SLEEP 1000 // 1 ms.
|
#define DEFAULT_STREAMER_SLEEP 30000 // 30 ms
|
||||||
#define DEFAULT_STREAMER_IDLE_SLEEP 1000000 // 1 sec
|
#define DEFAULT_STREAMER_IDLE_SLEEP 1000000 // 1 sec
|
||||||
|
|
||||||
// #define PQISTREAMER_DEBUG
|
// #define PQISTREAMER_DEBUG
|
||||||
@ -43,7 +43,8 @@ bool pqithreadstreamer::RecvItem(RsItem *item)
|
|||||||
|
|
||||||
int pqithreadstreamer::tick()
|
int pqithreadstreamer::tick()
|
||||||
{
|
{
|
||||||
RsStackMutex stack(mThreadMutex);
|
// pqithreadstreamer mutex lock is not needed here
|
||||||
|
// we will only check if the connection is active, and if not we will try to establish it
|
||||||
tick_bio();
|
tick_bio();
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
@ -54,6 +55,7 @@ void pqithreadstreamer::threadTick()
|
|||||||
uint32_t recv_timeout = 0;
|
uint32_t recv_timeout = 0;
|
||||||
uint32_t sleep_period = 0;
|
uint32_t sleep_period = 0;
|
||||||
bool isactive = false;
|
bool isactive = false;
|
||||||
|
|
||||||
{
|
{
|
||||||
RsStackMutex stack(mStreamerMtx);
|
RsStackMutex stack(mStreamerMtx);
|
||||||
recv_timeout = mTimeout;
|
recv_timeout = mTimeout;
|
||||||
@ -61,37 +63,39 @@ void pqithreadstreamer::threadTick()
|
|||||||
isactive = mBio->isactive();
|
isactive = mBio->isactive();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// update the connection rates
|
||||||
updateRates() ;
|
updateRates() ;
|
||||||
|
|
||||||
|
// if the connection est not active, long sleep then return
|
||||||
if (!isactive)
|
if (!isactive)
|
||||||
{
|
{
|
||||||
rstime::rs_usleep(DEFAULT_STREAMER_IDLE_SLEEP);
|
rstime::rs_usleep(DEFAULT_STREAMER_IDLE_SLEEP);
|
||||||
return ;
|
return ;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// fill incoming queue with items from SSL
|
||||||
{
|
{
|
||||||
RsStackMutex stack(mThreadMutex);
|
RsStackMutex stack(mThreadMutex);
|
||||||
tick_recv(recv_timeout);
|
tick_recv(recv_timeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Push Items, Outside of Mutex.
|
// move items to appropriate service queue or shortcut to fast service
|
||||||
RsItem *incoming = NULL;
|
RsItem *incoming = NULL;
|
||||||
while((incoming = GetItem()))
|
while((incoming = GetItem()))
|
||||||
{
|
{
|
||||||
RecvItem(incoming);
|
RecvItem(incoming);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// parse the outgoing queue and send items to SSL
|
||||||
{
|
{
|
||||||
RsStackMutex stack(mThreadMutex);
|
RsStackMutex stack(mThreadMutex);
|
||||||
tick_send(0);
|
tick_send(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// sleep
|
||||||
if (sleep_period)
|
if (sleep_period)
|
||||||
{
|
{
|
||||||
rstime::rs_usleep(sleep_period);
|
rstime::rs_usleep(sleep_period);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -143,13 +143,14 @@ void RsServer::threadTick()
|
|||||||
// if there is time left, we sleep
|
// if there is time left, we sleep
|
||||||
double timeToSleep = mTickInterval - mAvgRunDuration;
|
double timeToSleep = mTickInterval - mAvgRunDuration;
|
||||||
|
|
||||||
if (timeToSleep > 0)
|
// never sleep less than 50 ms
|
||||||
{
|
if (timeToSleep < 0.050)
|
||||||
|
timeToSleep = 0.050;
|
||||||
|
|
||||||
#ifdef TICK_DEBUG
|
#ifdef TICK_DEBUG
|
||||||
RsDbg() << "TICK_DEBUG will sleep " << timeToSleep << " ms" << std::endl;
|
RsDbg() << "TICK_DEBUG will sleep " << (int) (1000 * timeToSleep) << " ms" << std::endl;
|
||||||
#endif
|
#endif
|
||||||
rstime::rs_usleep(timeToSleep * 1000000);
|
rstime::rs_usleep(timeToSleep * 1000000);
|
||||||
}
|
|
||||||
|
|
||||||
double ts = getCurrentTS();
|
double ts = getCurrentTS();
|
||||||
mLastts = ts;
|
mLastts = ts;
|
||||||
@ -229,12 +230,16 @@ void RsServer::threadTick()
|
|||||||
// ticking is done, now compute new values of mLastRunDuration, mAvgRunDuration and mTickInterval
|
// ticking is done, now compute new values of mLastRunDuration, mAvgRunDuration and mTickInterval
|
||||||
ts = getCurrentTS();
|
ts = getCurrentTS();
|
||||||
mLastRunDuration = ts - mLastts;
|
mLastRunDuration = ts - mLastts;
|
||||||
|
|
||||||
|
// low-pass filter and don't let mAvgRunDuration exceeds maxTickInterval
|
||||||
mAvgRunDuration = 0.1 * mLastRunDuration + 0.9 * mAvgRunDuration;
|
mAvgRunDuration = 0.1 * mLastRunDuration + 0.9 * mAvgRunDuration;
|
||||||
|
if (mAvgRunDuration > maxTickInterval)
|
||||||
|
mAvgRunDuration = maxTickInterval;
|
||||||
|
|
||||||
#ifdef TICK_DEBUG
|
#ifdef TICK_DEBUG
|
||||||
RsDbg() << "TICK_DEBUG new mLastRunDuration " << mLastRunDuration << " mAvgRunDuration " << mAvgRunDuration << std::endl;
|
RsDbg() << "TICK_DEBUG new mLastRunDuration " << mLastRunDuration << " mAvgRunDuration " << mAvgRunDuration << std::endl;
|
||||||
if (mLastRunDuration > WARN_BIG_CYCLE_TIME)
|
if (mLastRunDuration > WARN_BIG_CYCLE_TIME)
|
||||||
RsDbg() << "TICK_DEBUG excessively long lycle time " << mLastRunDuration << std::endl;
|
RsDbg() << "TICK_DEBUG excessively long cycle time " << mLastRunDuration << std::endl;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
// if the core has returned that there is more to tick we decrease the ticking interval, else we increse it
|
// if the core has returned that there is more to tick we decrease the ticking interval, else we increse it
|
||||||
@ -250,7 +255,7 @@ void RsServer::threadTick()
|
|||||||
RsDbg() << "TICK_DEBUG new tick interval " << mTickInterval << std::endl;
|
RsDbg() << "TICK_DEBUG new tick interval " << mTickInterval << std::endl;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
// keep the tick interval within allowed limits
|
// keep the tick interval target within allowed limits
|
||||||
if (mTickInterval < minTickInterval)
|
if (mTickInterval < minTickInterval)
|
||||||
mTickInterval = minTickInterval;
|
mTickInterval = minTickInterval;
|
||||||
else if (mTickInterval > maxTickInterval)
|
else if (mTickInterval > maxTickInterval)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user