added packet packing in pqistreamer. To be tested for improvement in bw

This commit is contained in:
csoler 2015-12-12 11:52:48 -05:00
parent 893f178ce1
commit f6a84aa4ad

View File

@ -416,104 +416,127 @@ time_t pqistreamer::getLastIncomingTS()
int pqistreamer::handleoutgoing_locked() int pqistreamer::handleoutgoing_locked()
{ {
#ifdef DEBUG_PQISTREAMER #ifdef DEBUG_PQISTREAMER
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::handleoutgoing_locked()"); pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::handleoutgoing_locked()");
#endif #endif
int maxbytes = outAllowedBytes_locked(); int maxbytes = outAllowedBytes_locked();
int sentbytes = 0; int sentbytes = 0;
int len; int len;
int ss; int ss;
// std::cerr << "pqistreamer: maxbytes=" << maxbytes<< std::endl ; // std::cerr << "pqistreamer: maxbytes=" << maxbytes<< std::endl ;
std::list<void *>::iterator it; std::list<void *>::iterator it;
// if not connection, or cannot send anything... pause. // if not connection, or cannot send anything... pause.
if (!(mBio->isactive())) if (!(mBio->isactive()))
{ {
/* if we are not active - clear anything in the queues. */ /* if we are not active - clear anything in the queues. */
locked_clear_out_queue() ; locked_clear_out_queue() ;
/* also remove the pending packets */ /* also remove the pending packets */
if (mPkt_wpending) if (mPkt_wpending)
{ {
free(mPkt_wpending); free(mPkt_wpending);
mPkt_wpending = NULL; mPkt_wpending = NULL;
} }
outSentBytes_locked(sentbytes); outSentBytes_locked(sentbytes);
return 0; return 0;
} }
// a very simple round robin // a very simple round robin
bool sent = true; bool sent = true;
while(sent) // catch if all items sent. int nsent = 0 ;
{ while(sent) // catch if all items sent.
sent = false; {
sent = false;
if ((!(mBio->cansend(0))) || (maxbytes < sentbytes)) if ((!(mBio->cansend(0))) || (maxbytes < sentbytes))
{ {
#ifdef DEBUG_TRANSFERS #ifdef DEBUG_TRANSFERS
if (maxbytes < sentbytes) if (maxbytes < sentbytes)
{ {
std::cerr << "pqistreamer::handleoutgoing_locked() Stopped sending sentbytes > maxbytes. Sent " << sentbytes << " bytes "; std::cerr << "pqistreamer::handleoutgoing_locked() Stopped sending sentbytes > maxbytes. Sent " << sentbytes << " bytes ";
std::cerr << std::endl; std::cerr << std::endl;
} }
else else
{ {
std::cerr << "pqistreamer::handleoutgoing_locked() Stopped sending at cansend() is false"; std::cerr << "pqistreamer::handleoutgoing_locked() Stopped sending at cansend() is false";
std::cerr << std::endl; std::cerr << std::endl;
} }
#endif #endif
outSentBytes_locked(sentbytes); outSentBytes_locked(sentbytes);
return 0; return 0;
} }
#define GROUP_OUTGOING_PACKETS 1
// send a out_pkt., else send out_data. unless // send a out_pkt., else send out_data. unless
// there is a pending packet. // there is a pending packet.
if (!mPkt_wpending) if (!mPkt_wpending)
mPkt_wpending = locked_pop_out_data() ; #ifdef GROUP_OUTGOING_PACKETS
{
if (mPkt_wpending) void *dta;
{ len = 0 ;
// write packet. int k=0;
len = getRsItemSize(mPkt_wpending); while(len < maxbytes && (dta = locked_pop_out_data())!=NULL )
{
uint32_t s = getRsItemSize(dta);
mPkt_wpending = realloc(mPkt_wpending,s+len) ;
memcpy(mPkt_wpending+len,dta,s) ;
free(dta);
len += s ;
++k ;
}
if(k > 1)
std::cerr << "Packed " << k << " packets into " << len << " bytes." << std::endl;
}
#else
{
mPkt_wpending = locked_pop_out_data() ;
len = getRsItemSize(mPkt_wpending);
}
#endif
if (mPkt_wpending)
{
// write packet.
#ifdef DEBUG_PQISTREAMER #ifdef DEBUG_PQISTREAMER
std::cout << "Sending Out Pkt of size " << len << " !" << std::endl; std::cout << "Sending Out Pkt of size " << len << " !" << std::endl;
#endif #endif
if (len != (ss = mBio->senddata(mPkt_wpending, len))) if (len != (ss = mBio->senddata(mPkt_wpending, len)))
{ {
#ifdef DEBUG_PQISTREAMER #ifdef DEBUG_PQISTREAMER
std::string out; std::string out;
rs_sprintf(out, "Problems with Send Data! (only %d bytes sent, total pkt size=%d)", ss, len); rs_sprintf(out, "Problems with Send Data! (only %d bytes sent, total pkt size=%d)", ss, len);
// std::cerr << out << std::endl ; // std::cerr << out << std::endl ;
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out); pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out);
#endif #endif
outSentBytes_locked(sentbytes); outSentBytes_locked(sentbytes);
// pkt_wpending will kept til next time. // pkt_wpending will kept til next time.
// ensuring exactly the same data is written (openSSL requirement). // ensuring exactly the same data is written (openSSL requirement).
return -1; return -1;
} }
++nsent;
#ifdef DEBUG_TRANSFERS #ifdef DEBUG_TRANSFERS
std::cerr << "pqistreamer::handleoutgoing_locked() Sent Packet len: " << len << " @ " << RsUtil::AccurateTimeString(); std::cerr << "pqistreamer::handleoutgoing_locked() Sent Packet len: " << len << " @ " << RsUtil::AccurateTimeString();
std::cerr << std::endl; std::cerr << std::endl;
#endif #endif
free(mPkt_wpending); free(mPkt_wpending);
mPkt_wpending = NULL; mPkt_wpending = NULL;
sentbytes += len; sentbytes += len;
sent = true; sent = true;
} }
} }
outSentBytes_locked(sentbytes); if(nsent > 0)
return 1; std::cerr << "nsent = " << nsent << ", total bytes=" << sentbytes << std::endl;
outSentBytes_locked(sentbytes);
return 1;
} }