/* * "$Id: pqistreamer.cc,v 1.19 2007-02-18 21:46:50 rmf24 Exp $" * * 3P/PQI network interface for RetroShare. * * Copyright 2004-2006 by Robert Fernie. * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public * License Version 2 as published by the Free Software Foundation. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Library General Public License for more details. * * You should have received a copy of the GNU Library General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 * USA. * * Please report all bugs and problems to "retroshare@lunamutt.com". * */ #include #include #include "util/rsdebug.h" #include "util/rsstring.h" #include "pqi/pqistreamer.h" #include "pqi/pqinotify.h" #include "serialiser/rsserial.h" #include "serialiser/rsbaseitems.h" /***** For RsFileData *****/ const int pqistreamerzone = 8221; const int PQISTREAM_ABS_MAX = 100000000; /* 100 MB/sec (actually per loop) */ /* This removes the print statements (which hammer pqidebug) */ /*** #define RSITEM_DEBUG 1 #define DEBUG_TRANSFERS 1 #define DEBUG_PQISTREAMER 1 ***/ #ifdef DEBUG_TRANSFERS #include "util/rsprint.h" #endif pqistreamer::pqistreamer(RsSerialiser *rss, std::string id, BinInterface *bio_in, int bio_flags_in) :PQInterface(id), rsSerialiser(rss), bio(bio_in), bio_flags(bio_flags_in), pkt_wpending(NULL), totalRead(0), totalSent(0), currRead(0), currSent(0), avgReadCount(0), avgSentCount(0), streamerMtx("pqistreamer") { avgLastUpdate = currReadTS = currSentTS = time(NULL); /* allocated once */ pkt_rpend_size = getRsPktMaxSize(); pkt_rpending = malloc(pkt_rpend_size); reading_state = reading_state_initial ; // thread_id = pthread_self() ; // avoid uninitialized (and random) memory read. memset(pkt_rpending,0,pkt_rpend_size) ; // 100 B/s (minimal) setMaxRate(true, 0.1); setMaxRate(false, 0.1); setRate(true, 0); setRate(false, 0); pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::pqistreamer() Initialisation!"); if (!bio_in) { pqioutput(PQL_ALERT, pqistreamerzone, "pqistreamer::pqistreamer() NULL bio, FATAL ERROR!"); exit(1); } failed_read_attempts = 0 ; // reset failed read, as no packet is still read. return; } pqistreamer::~pqistreamer() { RsStackMutex stack(streamerMtx) ; // lock out_pkt and out_data pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::~pqistreamer() Destruction!"); if (bio_flags & BIN_FLAGS_NO_CLOSE) { pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::~pqistreamer() Not Closing BinInterface!"); } else if (bio) { pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::~pqistreamer() Deleting BinInterface!"); delete bio; } /* clean up serialiser */ if (rsSerialiser) delete rsSerialiser; // clean up outgoing. (cntrl packets) locked_clear_out_queue() ; if (pkt_wpending) { free(pkt_wpending); pkt_wpending = NULL; } free(pkt_rpending); // clean up incoming. while(incoming.size() > 0) { RsItem *i = incoming.front(); incoming.pop_front(); delete i; } return; } // Get/Send Items. int pqistreamer::SendItem(RsItem *si,uint32_t& out_size) { #ifdef RSITEM_DEBUG { std::string out = "pqistreamer::SendItem():\n"; si -> print_string(out); pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out); std::cerr << out; } #endif return queue_outpqi(si,out_size); } RsItem *pqistreamer::GetItem() { #ifdef DEBUG_PQISTREAMER pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::GetItem()"); #endif if(incoming.empty()) return NULL; RsItem *osr = incoming.front() ; incoming.pop_front() ; return osr; } // // PQInterface int pqistreamer::tick() { #ifdef DEBUG_PQISTREAMER { std::string out = "pqistreamer::tick()\n" + PeerId(); rs_sprintf_append(out, ": currRead/Sent: %d/%d", currRead, currSent); pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out); } #endif bio->tick(); /* short circuit everything is bio isn't active */ if (!(bio->isactive())) { return 0; } /* must do both, as outgoing will catch some bad sockets, * that incoming will not */ handleincoming(); handleoutgoing(); #ifdef DEBUG_PQISTREAMER /* give details of the packets */ { std::list::iterator it; std::string out = "pqistreamer::tick() Queued Data: for " + PeerId(); if (bio->isactive()) { out += " (active)"; } else { out += " (waiting)"; } out += "\n"; { RsStackMutex stack(streamerMtx) ; // lock out_pkt and out_data int total = compute_out_pkt_size() ; rs_sprintf_append(out, "\t Out Packets [%d] => %d bytes\n", out_queue_size(), total); rs_sprintf_append(out, "\t Incoming [%d]\n", incoming.size()); } pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out); } #endif /* if there is more stuff in the queues */ if ((incoming.size() > 0) || (out_queue_size() > 0)) { return 1; } return 0; } int pqistreamer::status() { #ifdef DEBUG_PQISTREAMER pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::status()"); if (bio->isactive()) { std::string out; rs_sprintf(out, "Data in:%d out:%d", totalRead, totalSent); pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out); } #endif return 0; } void pqistreamer::locked_storeInOutputQueue(void *ptr,int) { out_pkt.push_back(ptr); } // /**************** HANDLE OUTGOING TRANSLATION + TRANSMISSION ******/ int pqistreamer::queue_outpqi(RsItem *pqi,uint32_t& pktsize) { pktsize = 0 ; #ifdef DEBUG_PQISTREAMER std::cerr << "pqistreamer::queue_outpqi() called." << std::endl; #endif RsStackMutex stack(streamerMtx) ; // lock out_pkt and out_data // This is called by different threads, and by threads that are not the handleoutgoing thread, // so it should be protected by a mutex !! #ifdef DEBUG_PQISTREAMER if(dynamic_cast(pqi)!=NULL && (bio_flags & BIN_FLAGS_NO_DELETE)) { std::cerr << "Having file data with flags = " << bio_flags << std::endl ; } pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::queue_outpqi()"); #endif /* decide which type of packet it is */ pktsize = rsSerialiser->size(pqi); void *ptr = malloc(pktsize); #ifdef DEBUG_PQISTREAMER std::cerr << "pqistreamer::queue_outpqi() serializing packet with packet size : " << pktsize << std::endl; #endif if (rsSerialiser->serialise(pqi, ptr, &pktsize)) { locked_storeInOutputQueue(ptr,pqi->priority_level()) ; if (!(bio_flags & BIN_FLAGS_NO_DELETE)) { delete pqi; } return 1; } else { /* cleanup serialiser */ free(ptr); } std::string out = "pqistreamer::queue_outpqi() Null Pkt generated!\nCaused By:\n"; pqi -> print_string(out); pqioutput(PQL_ALERT, pqistreamerzone, out); if (!(bio_flags & BIN_FLAGS_NO_DELETE)) { delete pqi; } return 1; // keep error internal. } int pqistreamer::handleincomingitem(RsItem *pqi) { #ifdef DEBUG_PQISTREAMER pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::handleincomingitem()"); #endif // timestamp last received packet. mLastIncomingTs = time(NULL); // Use overloaded Contact function pqi -> PeerId(PeerId()); incoming.push_back(pqi); return 1; } time_t pqistreamer::getLastIncomingTS() { return mLastIncomingTs; } int pqistreamer::handleoutgoing() { RsStackMutex stack(streamerMtx) ; // lock out_pkt and out_data #ifdef DEBUG_PQISTREAMER pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::handleoutgoing()"); #endif int maxbytes = outAllowedBytes(); int sentbytes = 0; int len; int ss; // std::cerr << "pqistreamer: maxbytes=" << maxbytes<< std::endl ; std::list::iterator it; // if not connection, or cannot send anything... pause. if (!(bio->isactive())) { /* if we are not active - clear anything in the queues. */ locked_clear_out_queue() ; /* also remove the pending packets */ if (pkt_wpending) { free(pkt_wpending); pkt_wpending = NULL; } outSentBytes(sentbytes); return 0; } // a very simple round robin bool sent = true; while(sent) // catch if all items sent. { sent = false; if ((!(bio->cansend())) || (maxbytes < sentbytes)) { #ifdef DEBUG_TRANSFERS if (maxbytes < sentbytes) { std::cerr << "pqistreamer::handleoutgoing() Stopped sending sentbytes > maxbytes. Sent " << sentbytes << " bytes "; std::cerr << std::endl; } else { std::cerr << "pqistreamer::handleoutgoing() Stopped sending at cansend() is false"; std::cerr << std::endl; } #endif outSentBytes(sentbytes); return 0; } // send a out_pkt., else send out_data. unless // there is a pending packet. if (!pkt_wpending) pkt_wpending = locked_pop_out_data() ; if (pkt_wpending) { // write packet. len = getRsItemSize(pkt_wpending); #ifdef DEBUG_PQISTREAMER std::cout << "Sending Out Pkt of size " << len << " !" << std::endl; #endif if (len != (ss = bio->senddata(pkt_wpending, len))) { #ifdef DEBUG_PQISTREAMER std::string out; rs_sprintf(out, "Problems with Send Data! (only %d bytes sent, total pkt size=%d)", ss, len); // std::cerr << out << std::endl ; pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out); #endif outSentBytes(sentbytes); // pkt_wpending will kept til next time. // ensuring exactly the same data is written (openSSL requirement). return -1; } #ifdef DEBUG_TRANSFERS std::cerr << "pqistreamer::handleoutgoing() Sent Packet len: " << len << " @ " << RsUtil::AccurateTimeString(); std::cerr << std::endl; #endif free(pkt_wpending); pkt_wpending = NULL; sentbytes += len; sent = true; } } outSentBytes(sentbytes); return 1; } /* Handles reading from input stream. */ int pqistreamer::handleincoming() { int readbytes = 0; static const int max_failed_read_attempts = 2000 ; #ifdef DEBUG_PQISTREAMER pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::handleincoming()"); #endif if(!(bio->isactive())) { reading_state = reading_state_initial ; inReadBytes(readbytes); return 0; } // enough space to read any packet. int maxlen = pkt_rpend_size; void *block = pkt_rpending; // initial read size: basic packet. int blen = getRsPktBaseSize(); int maxin = inAllowedBytes(); #ifdef DEBUG_PQISTREAMER std::cerr << "[" << (void*)pthread_self() << "] " << "reading state = " << reading_state << std::endl ; #endif switch(reading_state) { case reading_state_initial: /*std::cerr << "jumping to start" << std::endl; */ goto start_packet_read ; case reading_state_packet_started: /*std::cerr << "jumping to middle" << std::endl;*/ goto continue_packet ; } start_packet_read: { // scope to ensure variable visibility // read the basic block (minimum packet size) int tmplen; #ifdef DEBUG_PQISTREAMER std::cerr << "[" << (void*)pthread_self() << "] " << "starting packet" << std::endl ; #endif memset(block,0,blen) ; // reset the block, to avoid uninitialized memory reads. if (blen != (tmplen = bio->readdata(block, blen))) { pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, "pqistreamer::handleincoming() Didn't read BasePkt!"); inReadBytes(readbytes); // error.... (either blocked or failure) if (tmplen == 0) { #ifdef DEBUG_PQISTREAMER // most likely blocked! pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, "pqistreamer::handleincoming() read blocked"); std::cerr << "[" << (void*)pthread_self() << "] " << "given up 1" << std::endl ; #endif return 0; } else if (tmplen < 0) { // Most likely it is that the packet is pending but could not be read by pqissl because of stream flow. // So we return without an error, and leave the machine state in 'start_read'. // //pqioutput(PQL_WARNING, pqistreamerzone, "pqistreamer::handleincoming() Error in bio read"); #ifdef DEBUG_PQISTREAMER std::cerr << "[" << (void*)pthread_self() << "] " << "given up 2, state = " << reading_state << std::endl ; #endif return 0; } else // tmplen > 0 { // strange case....This should never happen as partial reads are handled by pqissl below. #ifdef DEBUG_PQISTREAMER std::string out = "pqistreamer::handleincoming() Incomplete "; rs_sprintf_append(out, "(Strange) read of %d bytes", tmplen); pqioutput(PQL_ALERT, pqistreamerzone, out); std::cerr << "[" << (void*)pthread_self() << "] " << "given up 3" << std::endl ; #endif return -1; } } #ifdef DEBUG_PQISTREAMER std::cerr << "[" << (void*)pthread_self() << "] " << "block 0 : " << (int)(((unsigned char*)block)[0]) << " " << (int)(((unsigned char*)block)[1]) << " " << (int)(((unsigned char*)block)[2]) << " " << (int)(((unsigned char*)block)[3]) << " " << (int)(((unsigned char*)block)[4]) << " " << (int)(((unsigned char*)block)[5]) << " " << (int)(((unsigned char*)block)[6]) << " " << (int)(((unsigned char*)block)[7]) << " " << std::endl ; #endif readbytes += blen; reading_state = reading_state_packet_started ; failed_read_attempts = 0 ; // reset failed read, as the packet has been totally read. } continue_packet: { // workout how much more to read. int extralen = getRsItemSize(block) - blen; #ifdef DEBUG_PQISTREAMER std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet getRsItemSize(block) = " << getRsItemSize(block) << std::endl ; std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet extralen = " << extralen << std::endl ; std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet state=" << reading_state << std::endl ; std::cerr << "[" << (void*)pthread_self() << "] " << "block 1 : " << (int)(((unsigned char*)block)[0]) << " " << (int)(((unsigned char*)block)[1]) << " " << (int)(((unsigned char*)block)[2]) << " " << (int)(((unsigned char*)block)[3]) << " " << (int)(((unsigned char*)block)[4]) << " " << (int)(((unsigned char*)block)[5]) << " " << (int)(((unsigned char*)block)[6]) << " " << (int)(((unsigned char*)block)[7]) << " " << std::endl ; #endif if (extralen > maxlen - blen) { pqioutput(PQL_ALERT, pqistreamerzone, "ERROR: Read Packet too Big!"); pqiNotify *notify = getPqiNotify(); if (notify) { std::string title = "Warning: Bad Packet Read"; std::string msg; msg = " **** WARNING **** \n"; msg += "Retroshare has caught a BAD Packet Read"; msg += "\n"; msg += "This is normally caused by connecting to an"; msg += " OLD version of Retroshare"; msg += "\n"; rs_sprintf_append(msg, "(M:%d B:%d E:%d)\n", maxlen, blen, extralen); msg += "\n"; rs_sprintf_append(msg, "block = %d %d %d %d %d %d %d %d\n", (int)(((unsigned char*)block)[0]), (int)(((unsigned char*)block)[1]), (int)(((unsigned char*)block)[2]), (int)(((unsigned char*)block)[3]), (int)(((unsigned char*)block)[4]), (int)(((unsigned char*)block)[5]), (int)(((unsigned char*)block)[6]), (int)(((unsigned char*)block)[7])) ; msg += "\n"; msg += "Please get your friends to upgrade to the latest version"; msg += "\n"; msg += "\n"; msg += "If you are sure the error was not caused by an old version"; msg += "\n"; msg += "Please report the problem to Retroshare's developers"; msg += "\n"; notify->AddLogMessage(0, RS_SYS_WARNING, title, msg); std::cerr << "pqistreamer::handle_incoming() ERROR: Read Packet too Big" << std::endl; std::cerr << msg; std::cerr << std::endl; } bio->close(); reading_state = reading_state_initial ; // restart at state 1. failed_read_attempts = 0 ; return -1; // Used to exit now! exit(1); } if (extralen > 0) { void *extradata = (void *) (((char *) block) + blen); int tmplen ; // Don't reset the block now! If pqissl is in the middle of a multiple-chunk // packet (larger than 16384 bytes), and pqistreamer jumped directly yo // continue_packet:, then readdata is going to write after the beginning of // extradata, yet not exactly at start -> the start of the packet would be wiped out. // // so, don't do that: // memset( extradata,0,extralen ) ; if (extralen != (tmplen = bio->readdata(extradata, extralen))) { #ifdef DEBUG_PQISTREAMER if(tmplen > 0) std::cerr << "[" << (void*)pthread_self() << "] " << "Incomplete packet read ! This is a real problem ;-)" << std::endl ; #endif if(++failed_read_attempts > max_failed_read_attempts) { std::string out; rs_sprintf(out, "Error Completing Read (read %d/%d)", tmplen, extralen); std::cerr << out << std::endl ; pqioutput(PQL_ALERT, pqistreamerzone, out); pqiNotify *notify = getPqiNotify(); if (notify) { std::string title = "Warning: Error Completing Read"; std::string msgout; msgout = " **** WARNING **** \n"; msgout += "Retroshare has experienced an unexpected Read ERROR"; msgout += "\n"; rs_sprintf_append(msgout, "(M:%d B:%d E:%d R:%d)\n", maxlen, blen, extralen, tmplen); msgout += "\n"; msgout += "Note: this error might as well happen (rarely) when a peer disconnects in between a transmission of a large packet.\n"; msgout += "If it happens manny time, please contact the developers, and send them these numbers:"; msgout += "\n"; rs_sprintf_append(msgout, "block = %d %d %d %d %d %d %d %d\n", (int)(((unsigned char*)block)[0]), (int)(((unsigned char*)block)[1]), (int)(((unsigned char*)block)[2]), (int)(((unsigned char*)block)[3]), (int)(((unsigned char*)block)[4]), (int)(((unsigned char*)block)[5]), (int)(((unsigned char*)block)[6]), (int)(((unsigned char*)block)[7])); //notify->AddSysMessage(0, RS_SYS_WARNING, title, msgout.str()); std::cerr << msgout << std::endl; } bio->close(); reading_state = reading_state_initial ; // restart at state 1. failed_read_attempts = 0 ; return -1; } else { #ifdef DEBUG_PQISTREAMER std::cerr << "[" << (void*)pthread_self() << "] " << "given up 5, state = " << reading_state << std::endl ; #endif return 0 ; // this is just a SSL_WANT_READ error. Don't panic, we'll re-try the read soon. // we assume readdata() returned either -1 or the complete read size. } } #ifdef DEBUG_PQISTREAMER std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet state=" << reading_state << std::endl ; std::cerr << "[" << (void*)pthread_self() << "] " << "block 2 : " << (int)(((unsigned char*)extradata)[0]) << " " << (int)(((unsigned char*)extradata)[1]) << " " << (int)(((unsigned char*)extradata)[2]) << " " << (int)(((unsigned char*)extradata)[3]) << " " << (int)(((unsigned char*)extradata)[4]) << " " << (int)(((unsigned char*)extradata)[5]) << " " << (int)(((unsigned char*)extradata)[6]) << " " << (int)(((unsigned char*)extradata)[7]) << " " << std::endl ; #endif failed_read_attempts = 0 ; readbytes += extralen; } // create packet, based on header. #ifdef DEBUG_PQISTREAMER { std::string out; rs_sprintf(out, "Read Data Block -> Incoming Pkt(%d)", blen + extralen); //std::cerr << out ; pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out); } #endif // std::cerr << "Deserializing packet of size " << pktlen <deserialise(block, &pktlen); if ((pkt != NULL) && (0 < handleincomingitem(pkt))) { #ifdef DEBUG_PQISTREAMER pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, "Successfully Read a Packet!"); #endif } else { #ifdef DEBUG_PQISTREAMER pqioutput(PQL_ALERT, pqistreamerzone, "Failed to handle Packet!"); #endif } reading_state = reading_state_initial ; // restart at state 1. failed_read_attempts = 0 ; // reset failed read, as the packet has been totally read. } if(maxin > readbytes && bio->moretoread()) goto start_packet_read ; #ifdef DEBUG_TRANSFERS if (readbytes >= maxin) { std::cerr << "pqistreamer::handleincoming() Stopped reading as readbytes >= maxin. Read " << readbytes << " bytes "; std::cerr << std::endl; } #endif inReadBytes(readbytes); return 0; } /* BandWidth Management Assistance */ float pqistreamer::outTimeSlice() { #ifdef DEBUG_PQISTREAMER pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::outTimeSlice()"); #endif //fixme("pqistreamer::outTimeSlice()", 1); return 1; } // very simple..... int pqistreamer::outAllowedBytes() { int t = time(NULL); // get current timestep. /* allow a lot if not bandwidthLimited */ if (!bio->bandwidthLimited()) { currSent = 0; currSentTS = t; return PQISTREAM_ABS_MAX; } int dt = t - currSentTS; // limiter -> for when currSentTs -> 0. if (dt > 5) dt = 5; int maxout = (int) (getMaxRate(false) * 1000.0); currSent -= dt * maxout; if (currSent < 0) { currSent = 0; } currSentTS = t; #ifdef DEBUG_PQISTREAMER { std::string out; rs_sprintf(out, "pqistreamer::outAllowedBytes() is %d/%d", maxout - currSent, maxout); pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out); } #endif return maxout - currSent; } int pqistreamer::inAllowedBytes() { int t = time(NULL); // get current timestep. /* allow a lot if not bandwidthLimited */ if (!bio->bandwidthLimited()) { currReadTS = t; currRead = 0; return PQISTREAM_ABS_MAX; } int dt = t - currReadTS; // limiter -> for when currReadTs -> 0. if (dt > 5) dt = 5; int maxin = (int) (getMaxRate(true) * 1000.0); currRead -= dt * maxin; if (currRead < 0) { currRead = 0; } currReadTS = t; #ifdef DEBUG_PQISTREAMER { std::string out; rs_sprintf(out, "pqistreamer::inAllowedBytes() is %d/%d", maxin - currRead, maxin); pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out); } #endif return maxin - currRead; } static const float AVG_PERIOD = 5; // sec static const float AVG_FRAC = 0.8; // for low pass filter. void pqistreamer::outSentBytes(int outb) { #ifdef DEBUG_PQISTREAMER { std::string out; rs_sprintf(out, "pqistreamer::outSentBytes(): %d@%gkB/s", outb, getRate(false)); pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out); } #endif /*** One theory for the massive delays - is that the queue here is filling up ****/ //#define DEBUG_LAG 1 #ifdef DEBUG_LAG #define MIN_PKTS_FOR_MSG 100 if (out_queue_size() > MIN_PKTS_FOR_MSG) { std::cerr << "pqistreamer::outSentBytes() for: " << PeerId(); std::cerr << " End of Write and still " << out_queue_size() << " pkts left"; std::cerr << std::endl; } #endif totalSent += outb; currSent += outb; avgSentCount += outb; int t = time(NULL); // get current timestep. if (t - avgLastUpdate > AVG_PERIOD) { float avgReadpSec = getRate(true); float avgSentpSec = getRate(false); avgReadpSec *= AVG_FRAC; avgReadpSec += (1.0 - AVG_FRAC) * avgReadCount / (1000.0 * (t - avgLastUpdate)); avgSentpSec *= AVG_FRAC; avgSentpSec += (1.0 - AVG_FRAC) * avgSentCount / (1000.0 * (t - avgLastUpdate)); /* pretend our rate is zero if we are * not bandwidthLimited(). */ if (bio->bandwidthLimited()) { setRate(true, avgReadpSec); setRate(false, avgSentpSec); } else { setRate(true, 0); setRate(false, 0); } avgLastUpdate = t; avgReadCount = 0; avgSentCount = 0; } return; } void pqistreamer::inReadBytes(int inb) { #ifdef DEBUG_PQISTREAMER { std::string out; rs_sprintf(out, "pqistreamer::inReadBytes(): %d@%gkB/s", inb, getRate(true)); pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out); } #endif totalRead += inb; currRead += inb; avgReadCount += inb; return; } int pqistreamer::getQueueSize(bool in) { if (in) return incoming.size(); return out_queue_size(); } void pqistreamer::getRates(RsBwRates &rates) { RateInterface::getRates(rates); rates.mQueueIn = incoming.size(); rates.mQueueOut = out_queue_size(); } int pqistreamer::out_queue_size() const { // Warning: because out_pkt is a list, calling size // is O(n) ! Makign algorithms pretty inefficient. We should record how many // items get stored and discarded to have a proper size value at any time // return out_pkt.size() ; } void pqistreamer::locked_clear_out_queue() { for(std::list::iterator it = out_pkt.begin(); it != out_pkt.end(); ) { free(*it); it = out_pkt.erase(it); #ifdef DEBUG_PQISTREAMER std::string out = "pqistreamer::handleoutgoing() Not active -> Clearing Pkt!"; // std::cerr << out ; pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out); #endif } } int pqistreamer::locked_compute_out_pkt_size() const { int total = 0 ; for(std::list::const_iterator it = out_pkt.begin(); it != out_pkt.end(); ++it) total += getRsItemSize(*it); return total ; } void *pqistreamer::locked_pop_out_data() { void *res = NULL ; if (!out_pkt.empty()) { res = *(out_pkt.begin()); out_pkt.pop_front(); #ifdef DEBUG_TRANSFERS std::cerr << "pqistreamer::handleoutgoing() getting next pkt from out_pkt queue"; std::cerr << std::endl; #endif } return res ; }