diff --git a/libretroshare/src/pqi/pqistreamer.cc b/libretroshare/src/pqi/pqistreamer.cc index f77884321..5deded797 100644 --- a/libretroshare/src/pqi/pqistreamer.cc +++ b/libretroshare/src/pqi/pqistreamer.cc @@ -54,7 +54,7 @@ const int PQISTREAM_ABS_MAX = 100000000; /* 100 MB/sec (actually per loop) */ pqistreamer::pqistreamer(RsSerialiser *rss, const RsPeerId& id, BinInterface *bio_in, int bio_flags_in) :PQInterface(id), mStreamerMtx("pqistreamer"), mBio(bio_in), mBio_flags(bio_flags_in), mRsSerialiser(rss), - mPkt_wpending(NULL), + mPkt_wpending(NULL), mPkt_wpending_size(0), mTotalRead(0), mTotalSent(0), mCurrRead(0), mCurrSent(0), mAvgReadCount(0), mAvgSentCount(0) @@ -117,6 +117,7 @@ pqistreamer::~pqistreamer() { free(mPkt_wpending); mPkt_wpending = NULL; + mPkt_wpending_size = 0 ; } free_rpend_locked(); @@ -416,104 +417,140 @@ time_t pqistreamer::getLastIncomingTS() int pqistreamer::handleoutgoing_locked() { #ifdef DEBUG_PQISTREAMER - pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::handleoutgoing_locked()"); + pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::handleoutgoing_locked()"); #endif - int maxbytes = outAllowedBytes_locked(); - int sentbytes = 0; - int len; - int ss; - // std::cerr << "pqistreamer: maxbytes=" << maxbytes<< std::endl ; + int maxbytes = outAllowedBytes_locked(); + int sentbytes = 0; + + // std::cerr << "pqistreamer: maxbytes=" << maxbytes<< std::endl ; - std::list::iterator it; + std::list::iterator it; - // if not connection, or cannot send anything... pause. - if (!(mBio->isactive())) - { - /* if we are not active - clear anything in the queues. */ - locked_clear_out_queue() ; + // if not connection, or cannot send anything... pause. + if (!(mBio->isactive())) + { + /* if we are not active - clear anything in the queues. */ + locked_clear_out_queue() ; - /* also remove the pending packets */ - if (mPkt_wpending) - { - free(mPkt_wpending); - mPkt_wpending = NULL; - } + /* also remove the pending packets */ + if (mPkt_wpending) + { + free(mPkt_wpending); + mPkt_wpending = NULL; + mPkt_wpending_size = 0 ; + } - outSentBytes_locked(sentbytes); - return 0; - } + outSentBytes_locked(sentbytes); + return 0; + } - // a very simple round robin + // a very simple round robin - bool sent = true; - while(sent) // catch if all items sent. - { - sent = false; + bool sent = true; + int nsent = 0 ; + while(sent) // catch if all items sent. + { + sent = false; - if ((!(mBio->cansend(0))) || (maxbytes < sentbytes)) - { + if ((!(mBio->cansend(0))) || (maxbytes < sentbytes)) + { #ifdef DEBUG_TRANSFERS - if (maxbytes < sentbytes) - { - std::cerr << "pqistreamer::handleoutgoing_locked() Stopped sending sentbytes > maxbytes. Sent " << sentbytes << " bytes "; - std::cerr << std::endl; - } - else - { - std::cerr << "pqistreamer::handleoutgoing_locked() Stopped sending at cansend() is false"; - std::cerr << std::endl; - } + if (maxbytes < sentbytes) + { + std::cerr << "pqistreamer::handleoutgoing_locked() Stopped sending sentbytes > maxbytes. Sent " << sentbytes << " bytes "; + std::cerr << std::endl; + } + else + { + std::cerr << "pqistreamer::handleoutgoing_locked() Stopped sending at cansend() is false"; + std::cerr << std::endl; + } #endif - outSentBytes_locked(sentbytes); - return 0; - } + outSentBytes_locked(sentbytes); + return 0; + } +#define GROUP_OUTGOING_PACKETS 1 + // send a out_pkt., else send out_data. unless + // there is a pending packet. + if (!mPkt_wpending) +#ifdef GROUP_OUTGOING_PACKETS + { + void *dta; + mPkt_wpending_size = 0 ; + int k=0; + + while(mPkt_wpending_size < maxbytes && (dta = locked_pop_out_data())!=NULL ) + { + uint32_t s = getRsItemSize(dta); + mPkt_wpending = realloc(mPkt_wpending,s+mPkt_wpending_size) ; + memcpy(mPkt_wpending+mPkt_wpending_size,dta,s) ; + free(dta); + mPkt_wpending_size += s ; + ++k ; + } +#ifdef DEBUG_PQISTREAMER + if(k > 1) + std::cerr << "Packed " << k << " packets into " << mPkt_wpending_size << " bytes." << std::endl; +#endif + } +#else + { + void *dta = locked_pop_out_data() ; - // send a out_pkt., else send out_data. unless - // there is a pending packet. - if (!mPkt_wpending) - mPkt_wpending = locked_pop_out_data() ; - - if (mPkt_wpending) + if(dta != NULL) { - // write packet. - len = getRsItemSize(mPkt_wpending); - -#ifdef DEBUG_PQISTREAMER - std::cout << "Sending Out Pkt of size " << len << " !" << std::endl; -#endif - - if (len != (ss = mBio->senddata(mPkt_wpending, len))) - { -#ifdef DEBUG_PQISTREAMER - std::string out; - rs_sprintf(out, "Problems with Send Data! (only %d bytes sent, total pkt size=%d)", ss, len); -// std::cerr << out << std::endl ; - pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out); -#endif - - outSentBytes_locked(sentbytes); - // pkt_wpending will kept til next time. - // ensuring exactly the same data is written (openSSL requirement). - return -1; - } - -#ifdef DEBUG_TRANSFERS - std::cerr << "pqistreamer::handleoutgoing_locked() Sent Packet len: " << len << " @ " << RsUtil::AccurateTimeString(); - std::cerr << std::endl; -#endif - - free(mPkt_wpending); - mPkt_wpending = NULL; - - sentbytes += len; - sent = true; + mPkt_wpending = dta ; + mPkt_wpending_size = getRsItemSize(dta); } } - outSentBytes_locked(sentbytes); - return 1; +#endif + if (mPkt_wpending) + { + // write packet. +#ifdef DEBUG_PQISTREAMER + std::cout << "Sending Out Pkt of size " << mPkt_wpending_size << " !" << std::endl; +#endif + int ss=0; + + if (mPkt_wpending_size != (ss = mBio->senddata(mPkt_wpending, mPkt_wpending_size))) + { +#ifdef DEBUG_PQISTREAMER + std::string out; + rs_sprintf(out, "Problems with Send Data! (only %d bytes sent, total pkt size=%d)", ss, mPkt_wpending_size); + // std::cerr << out << std::endl ; + pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out); +#endif + + outSentBytes_locked(sentbytes); + // pkt_wpending will kept til next time. + // ensuring exactly the same data is written (openSSL requirement). + return -1; + } + ++nsent; + +#ifdef DEBUG_TRANSFERS + std::cerr << "pqistreamer::handleoutgoing_locked() Sent Packet len: " << mPkt_wpending_size << " @ " << RsUtil::AccurateTimeString(); + std::cerr << std::endl; +#endif + + sentbytes += mPkt_wpending_size; + + free(mPkt_wpending); + mPkt_wpending = NULL; + mPkt_wpending_size = 0 ; + + sent = true; + } + } +#ifdef DEBUG_PQISTREAMER + if(nsent > 0) + std::cerr << "nsent = " << nsent << ", total bytes=" << sentbytes << std::endl; +#endif + outSentBytes_locked(sentbytes); + return 1; } diff --git a/libretroshare/src/pqi/pqistreamer.h b/libretroshare/src/pqi/pqistreamer.h index 37d352a64..329a4a897 100644 --- a/libretroshare/src/pqi/pqistreamer.h +++ b/libretroshare/src/pqi/pqistreamer.h @@ -113,6 +113,7 @@ class pqistreamer: public PQInterface RsSerialiser *mRsSerialiser; void *mPkt_wpending; // storage for pending packet to write. + uint32_t mPkt_wpending_size; // ... and its size. void allocate_rpend_locked(); // use these two functions to allocate/free the buffer below void free_rpend_locked();