fixed a few bugs in packet packing in pqistreamer.

This commit is contained in:
csoler 2015-12-12 23:07:33 -05:00
parent f6a84aa4ad
commit 82d43eb8a0
2 changed files with 29 additions and 18 deletions

View File

@ -54,7 +54,7 @@ const int PQISTREAM_ABS_MAX = 100000000; /* 100 MB/sec (actually per loop) */
pqistreamer::pqistreamer(RsSerialiser *rss, const RsPeerId& id, BinInterface *bio_in, int bio_flags_in) pqistreamer::pqistreamer(RsSerialiser *rss, const RsPeerId& id, BinInterface *bio_in, int bio_flags_in)
:PQInterface(id), mStreamerMtx("pqistreamer"), :PQInterface(id), mStreamerMtx("pqistreamer"),
mBio(bio_in), mBio_flags(bio_flags_in), mRsSerialiser(rss), mBio(bio_in), mBio_flags(bio_flags_in), mRsSerialiser(rss),
mPkt_wpending(NULL), mPkt_wpending(NULL), mPkt_wpending_size(0),
mTotalRead(0), mTotalSent(0), mTotalRead(0), mTotalSent(0),
mCurrRead(0), mCurrSent(0), mCurrRead(0), mCurrSent(0),
mAvgReadCount(0), mAvgSentCount(0) mAvgReadCount(0), mAvgSentCount(0)
@ -117,6 +117,7 @@ pqistreamer::~pqistreamer()
{ {
free(mPkt_wpending); free(mPkt_wpending);
mPkt_wpending = NULL; mPkt_wpending = NULL;
mPkt_wpending_size = 0 ;
} }
free_rpend_locked(); free_rpend_locked();
@ -421,8 +422,7 @@ int pqistreamer::handleoutgoing_locked()
int maxbytes = outAllowedBytes_locked(); int maxbytes = outAllowedBytes_locked();
int sentbytes = 0; int sentbytes = 0;
int len;
int ss;
// std::cerr << "pqistreamer: maxbytes=" << maxbytes<< std::endl ; // std::cerr << "pqistreamer: maxbytes=" << maxbytes<< std::endl ;
std::list<void *>::iterator it; std::list<void *>::iterator it;
@ -438,6 +438,7 @@ int pqistreamer::handleoutgoing_locked()
{ {
free(mPkt_wpending); free(mPkt_wpending);
mPkt_wpending = NULL; mPkt_wpending = NULL;
mPkt_wpending_size = 0 ;
} }
outSentBytes_locked(sentbytes); outSentBytes_locked(sentbytes);
@ -478,38 +479,45 @@ int pqistreamer::handleoutgoing_locked()
#ifdef GROUP_OUTGOING_PACKETS #ifdef GROUP_OUTGOING_PACKETS
{ {
void *dta; void *dta;
len = 0 ; mPkt_wpending_size = 0 ;
int k=0; int k=0;
while(len < maxbytes && (dta = locked_pop_out_data())!=NULL )
while(mPkt_wpending_size < maxbytes && (dta = locked_pop_out_data())!=NULL )
{ {
uint32_t s = getRsItemSize(dta); uint32_t s = getRsItemSize(dta);
mPkt_wpending = realloc(mPkt_wpending,s+len) ; mPkt_wpending = realloc(mPkt_wpending,s+mPkt_wpending_size) ;
memcpy(mPkt_wpending+len,dta,s) ; memcpy(mPkt_wpending+mPkt_wpending_size,dta,s) ;
free(dta); free(dta);
len += s ; mPkt_wpending_size += s ;
++k ; ++k ;
} }
if(k > 1) if(k > 1)
std::cerr << "Packed " << k << " packets into " << len << " bytes." << std::endl; std::cerr << "Packed " << k << " packets into " << mPkt_wpending_size << " bytes." << std::endl;
} }
#else #else
{ {
mPkt_wpending = locked_pop_out_data() ; void *dta = locked_pop_out_data() ;
len = getRsItemSize(mPkt_wpending);
if(dta != NULL)
{
mPkt_wpending = dta ;
mPkt_wpending_size = getRsItemSize(dta);
}
} }
#endif #endif
if (mPkt_wpending) if (mPkt_wpending)
{ {
// write packet. // write packet.
#ifdef DEBUG_PQISTREAMER #ifdef DEBUG_PQISTREAMER
std::cout << "Sending Out Pkt of size " << len << " !" << std::endl; std::cout << "Sending Out Pkt of size " << mPkt_wpending_size << " !" << std::endl;
#endif #endif
int ss=0;
if (len != (ss = mBio->senddata(mPkt_wpending, len))) if (mPkt_wpending_size != (ss = mBio->senddata(mPkt_wpending, mPkt_wpending_size)))
{ {
#ifdef DEBUG_PQISTREAMER #ifdef DEBUG_PQISTREAMER
std::string out; std::string out;
rs_sprintf(out, "Problems with Send Data! (only %d bytes sent, total pkt size=%d)", ss, len); rs_sprintf(out, "Problems with Send Data! (only %d bytes sent, total pkt size=%d)", ss, mPkt_wpending_size);
// std::cerr << out << std::endl ; // std::cerr << out << std::endl ;
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out); pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out);
#endif #endif
@ -522,14 +530,16 @@ int pqistreamer::handleoutgoing_locked()
++nsent; ++nsent;
#ifdef DEBUG_TRANSFERS #ifdef DEBUG_TRANSFERS
std::cerr << "pqistreamer::handleoutgoing_locked() Sent Packet len: " << len << " @ " << RsUtil::AccurateTimeString(); std::cerr << "pqistreamer::handleoutgoing_locked() Sent Packet len: " << mPkt_wpending_size << " @ " << RsUtil::AccurateTimeString();
std::cerr << std::endl; std::cerr << std::endl;
#endif #endif
sentbytes += mPkt_wpending_size;
free(mPkt_wpending); free(mPkt_wpending);
mPkt_wpending = NULL; mPkt_wpending = NULL;
mPkt_wpending_size = 0 ;
sentbytes += len;
sent = true; sent = true;
} }
} }

View File

@ -113,6 +113,7 @@ class pqistreamer: public PQInterface
RsSerialiser *mRsSerialiser; RsSerialiser *mRsSerialiser;
void *mPkt_wpending; // storage for pending packet to write. void *mPkt_wpending; // storage for pending packet to write.
uint32_t mPkt_wpending_size; // ... and its size.
void allocate_rpend_locked(); // use these two functions to allocate/free the buffer below void allocate_rpend_locked(); // use these two functions to allocate/free the buffer below
void free_rpend_locked(); void free_rpend_locked();