Merge pull request #212 from csoler/v0.6-TrafficOptim

V0.6 traffic optim
This commit is contained in:
Cyril Soler 2015-12-19 21:38:55 -05:00
commit d50875b9bb
2 changed files with 120 additions and 82 deletions

View File

@ -54,7 +54,7 @@ const int PQISTREAM_ABS_MAX = 100000000; /* 100 MB/sec (actually per loop) */
pqistreamer::pqistreamer(RsSerialiser *rss, const RsPeerId& id, BinInterface *bio_in, int bio_flags_in)
:PQInterface(id), mStreamerMtx("pqistreamer"),
mBio(bio_in), mBio_flags(bio_flags_in), mRsSerialiser(rss),
mPkt_wpending(NULL),
mPkt_wpending(NULL), mPkt_wpending_size(0),
mTotalRead(0), mTotalSent(0),
mCurrRead(0), mCurrSent(0),
mAvgReadCount(0), mAvgSentCount(0)
@ -117,6 +117,7 @@ pqistreamer::~pqistreamer()
{
free(mPkt_wpending);
mPkt_wpending = NULL;
mPkt_wpending_size = 0 ;
}
free_rpend_locked();
@ -416,104 +417,140 @@ time_t pqistreamer::getLastIncomingTS()
int pqistreamer::handleoutgoing_locked()
{
#ifdef DEBUG_PQISTREAMER
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::handleoutgoing_locked()");
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::handleoutgoing_locked()");
#endif
int maxbytes = outAllowedBytes_locked();
int sentbytes = 0;
int len;
int ss;
// std::cerr << "pqistreamer: maxbytes=" << maxbytes<< std::endl ;
int maxbytes = outAllowedBytes_locked();
int sentbytes = 0;
std::list<void *>::iterator it;
// std::cerr << "pqistreamer: maxbytes=" << maxbytes<< std::endl ;
// if not connection, or cannot send anything... pause.
if (!(mBio->isactive()))
{
/* if we are not active - clear anything in the queues. */
locked_clear_out_queue() ;
std::list<void *>::iterator it;
/* also remove the pending packets */
if (mPkt_wpending)
{
free(mPkt_wpending);
mPkt_wpending = NULL;
}
// if not connection, or cannot send anything... pause.
if (!(mBio->isactive()))
{
/* if we are not active - clear anything in the queues. */
locked_clear_out_queue() ;
outSentBytes_locked(sentbytes);
return 0;
}
/* also remove the pending packets */
if (mPkt_wpending)
{
free(mPkt_wpending);
mPkt_wpending = NULL;
mPkt_wpending_size = 0 ;
}
// a very simple round robin
outSentBytes_locked(sentbytes);
return 0;
}
bool sent = true;
while(sent) // catch if all items sent.
{
sent = false;
// a very simple round robin
if ((!(mBio->cansend(0))) || (maxbytes < sentbytes))
{
bool sent = true;
int nsent = 0 ;
while(sent) // catch if all items sent.
{
sent = false;
if ((!(mBio->cansend(0))) || (maxbytes < sentbytes))
{
#ifdef DEBUG_TRANSFERS
if (maxbytes < sentbytes)
{
std::cerr << "pqistreamer::handleoutgoing_locked() Stopped sending sentbytes > maxbytes. Sent " << sentbytes << " bytes ";
std::cerr << std::endl;
}
else
{
std::cerr << "pqistreamer::handleoutgoing_locked() Stopped sending at cansend() is false";
std::cerr << std::endl;
}
if (maxbytes < sentbytes)
{
std::cerr << "pqistreamer::handleoutgoing_locked() Stopped sending sentbytes > maxbytes. Sent " << sentbytes << " bytes ";
std::cerr << std::endl;
}
else
{
std::cerr << "pqistreamer::handleoutgoing_locked() Stopped sending at cansend() is false";
std::cerr << std::endl;
}
#endif
outSentBytes_locked(sentbytes);
return 0;
}
outSentBytes_locked(sentbytes);
return 0;
}
#define GROUP_OUTGOING_PACKETS 1
// send a out_pkt., else send out_data. unless
// there is a pending packet.
if (!mPkt_wpending)
#ifdef GROUP_OUTGOING_PACKETS
{
void *dta;
mPkt_wpending_size = 0 ;
int k=0;
// send a out_pkt., else send out_data. unless
// there is a pending packet.
if (!mPkt_wpending)
mPkt_wpending = locked_pop_out_data() ;
while(mPkt_wpending_size < maxbytes && (dta = locked_pop_out_data())!=NULL )
{
uint32_t s = getRsItemSize(dta);
mPkt_wpending = realloc(mPkt_wpending,s+mPkt_wpending_size) ;
memcpy(mPkt_wpending+mPkt_wpending_size,dta,s) ;
free(dta);
mPkt_wpending_size += s ;
++k ;
}
#ifdef DEBUG_PQISTREAMER
if(k > 1)
std::cerr << "Packed " << k << " packets into " << mPkt_wpending_size << " bytes." << std::endl;
#endif
}
#else
{
void *dta = locked_pop_out_data() ;
if (mPkt_wpending)
if(dta != NULL)
{
// write packet.
len = getRsItemSize(mPkt_wpending);
#ifdef DEBUG_PQISTREAMER
std::cout << "Sending Out Pkt of size " << len << " !" << std::endl;
#endif
if (len != (ss = mBio->senddata(mPkt_wpending, len)))
{
#ifdef DEBUG_PQISTREAMER
std::string out;
rs_sprintf(out, "Problems with Send Data! (only %d bytes sent, total pkt size=%d)", ss, len);
// std::cerr << out << std::endl ;
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out);
#endif
outSentBytes_locked(sentbytes);
// pkt_wpending will kept til next time.
// ensuring exactly the same data is written (openSSL requirement).
return -1;
}
#ifdef DEBUG_TRANSFERS
std::cerr << "pqistreamer::handleoutgoing_locked() Sent Packet len: " << len << " @ " << RsUtil::AccurateTimeString();
std::cerr << std::endl;
#endif
free(mPkt_wpending);
mPkt_wpending = NULL;
sentbytes += len;
sent = true;
mPkt_wpending = dta ;
mPkt_wpending_size = getRsItemSize(dta);
}
}
outSentBytes_locked(sentbytes);
return 1;
#endif
if (mPkt_wpending)
{
// write packet.
#ifdef DEBUG_PQISTREAMER
std::cout << "Sending Out Pkt of size " << mPkt_wpending_size << " !" << std::endl;
#endif
int ss=0;
if (mPkt_wpending_size != (ss = mBio->senddata(mPkt_wpending, mPkt_wpending_size)))
{
#ifdef DEBUG_PQISTREAMER
std::string out;
rs_sprintf(out, "Problems with Send Data! (only %d bytes sent, total pkt size=%d)", ss, mPkt_wpending_size);
// std::cerr << out << std::endl ;
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out);
#endif
outSentBytes_locked(sentbytes);
// pkt_wpending will kept til next time.
// ensuring exactly the same data is written (openSSL requirement).
return -1;
}
++nsent;
#ifdef DEBUG_TRANSFERS
std::cerr << "pqistreamer::handleoutgoing_locked() Sent Packet len: " << mPkt_wpending_size << " @ " << RsUtil::AccurateTimeString();
std::cerr << std::endl;
#endif
sentbytes += mPkt_wpending_size;
free(mPkt_wpending);
mPkt_wpending = NULL;
mPkt_wpending_size = 0 ;
sent = true;
}
}
#ifdef DEBUG_PQISTREAMER
if(nsent > 0)
std::cerr << "nsent = " << nsent << ", total bytes=" << sentbytes << std::endl;
#endif
outSentBytes_locked(sentbytes);
return 1;
}

View File

@ -113,6 +113,7 @@ class pqistreamer: public PQInterface
RsSerialiser *mRsSerialiser;
void *mPkt_wpending; // storage for pending packet to write.
uint32_t mPkt_wpending_size; // ... and its size.
void allocate_rpend_locked(); // use these two functions to allocate/free the buffer below
void free_rpend_locked();