diff --git a/libretroshare/src/pqi/pqiperson.h b/libretroshare/src/pqi/pqiperson.h index 0a57cd1c5..f3ff940e1 100644 --- a/libretroshare/src/pqi/pqiperson.h +++ b/libretroshare/src/pqi/pqiperson.h @@ -63,8 +63,8 @@ public: virtual int connect(const struct sockaddr_storage &raddr) { return ni->connect(raddr); } virtual int listen() { return ni->listen(); } virtual int stoplistening() { return ni->stoplistening(); } - virtual int reset() { return ni->reset(); } - virtual int disconnect() { return ni->reset(); } + virtual int reset() { pqistreamer::reset(); return ni->reset(); } + virtual int disconnect() { return reset() ; } virtual bool connect_parameter(uint32_t type, uint32_t value) { return ni->connect_parameter(type, value);} virtual bool connect_parameter(uint32_t type, std::string value) { return ni->connect_parameter(type, value);} virtual bool connect_additional_address(uint32_t type, const struct sockaddr_storage &addr) { return ni->connect_additional_address(type, addr); } diff --git a/libretroshare/src/pqi/pqiqosstreamer.cc b/libretroshare/src/pqi/pqiqosstreamer.cc index c9cc7d570..b1d0aac1d 100644 --- a/libretroshare/src/pqi/pqiqosstreamer.cc +++ b/libretroshare/src/pqi/pqiqosstreamer.cc @@ -25,6 +25,8 @@ #include "pqiqosstreamer.h" +//#define DEBUG_PQIQOSSTREAMER 1 + const float pqiQoSstreamer::PQI_QOS_STREAMER_ALPHA = 2.0f ; pqiQoSstreamer::pqiQoSstreamer(PQInterface *parent, RsSerialiser *rss, const RsPeerId& peerid, BinInterface *bio_in, int bio_flagsin) @@ -60,6 +62,11 @@ void pqiQoSstreamer::locked_storeInOutputQueue(void *ptr,int size,int priority) void pqiQoSstreamer::locked_clear_out_queue() { +#ifdef DEBUG_PQIQOSSTREAMER + if(qos_queue_size() > 0) + std::cerr << " pqiQoSstreamer::locked_clear_out_queue(): clearing " << qos_queue_size() << " pending outqueue elements." << std::endl; +#endif + pqiQoS::clear() ; _total_item_size = 0 ; _total_item_count = 0 ; diff --git a/libretroshare/src/pqi/pqissl.cc b/libretroshare/src/pqi/pqissl.cc index 031cd2d7f..49cedfbbd 100644 --- a/libretroshare/src/pqi/pqissl.cc +++ b/libretroshare/src/pqi/pqissl.cc @@ -1661,8 +1661,6 @@ int pqissl::readdata(void *data, int len) int error = SSL_get_error(ssl_connection, tmppktlen); unsigned long err2 = ERR_get_error(); - //printSSLError(ssl_connection, tmppktlen, error, err2, out); - if ((error == SSL_ERROR_ZERO_RETURN) && (err2 == 0)) { /* this code will be called when @@ -1761,7 +1759,10 @@ int pqissl::readdata(void *data, int len) rs_sprintf_append(out, "SSL_read() UNKNOWN ERROR: %d Resetting!", error); rslog(RSL_ALERT, pqisslzone, out); std::cerr << out << std::endl ; + std::cerr << ", SSL_read() output is " << tmppktlen << std::endl ; + printSSLError(ssl_connection, tmppktlen, error, err2, out); + rslog(RSL_ALERT, pqisslzone, "pqissl::readdata() -> calling reset()"); reset_locked(); return -1; diff --git a/libretroshare/src/pqi/pqistreamer.cc b/libretroshare/src/pqi/pqistreamer.cc index f15bc3eee..166cc6b1e 100644 --- a/libretroshare/src/pqi/pqistreamer.cc +++ b/libretroshare/src/pqi/pqistreamer.cc @@ -135,17 +135,7 @@ pqistreamer::~pqistreamer() if (mRsSerialiser) delete mRsSerialiser; - // clean up outgoing. (cntrl packets) - locked_clear_out_queue() ; - - if (mPkt_wpending) - { - free(mPkt_wpending); - mPkt_wpending = NULL; - mPkt_wpending_size = 0 ; - } - - free_rpend_locked(); + free_pend_locked(); // clean up incoming. while(!mIncoming.empty()) @@ -156,6 +146,7 @@ pqistreamer::~pqistreamer() delete i; } + if(mIncomingSize != 0) std::cerr << "(EE) inconsistency after deleting pqistreamer queue. Remaining items: " << mIncomingSize << std::endl; return; @@ -282,7 +273,7 @@ int pqistreamer::tick_recv(uint32_t timeout) } if(!(mBio->isactive())) { - free_rpend_locked(); + free_pend_locked(); } return 1; } @@ -295,6 +286,7 @@ int pqistreamer::tick_send(uint32_t timeout) /* short circuit everything is bio isn't active */ if (!(mBio->isactive())) { + free_pend_locked(); return 0; } @@ -690,7 +682,7 @@ int pqistreamer::handleincoming_locked() if(!(mBio->isactive())) { mReading_state = reading_state_initial ; - free_rpend_locked(); + free_pend_locked(); return 0; } else @@ -762,7 +754,7 @@ start_packet_read: } } #ifdef DEBUG_PQISTREAMER - std::cerr << "[" << (void*)pthread_self() << "] " << "block 0 : " << RsUtil::BinToHex(block,8) << std::endl; + std::cerr << "[" << (void*)pthread_self() << "] " << "block 0 : " << RsUtil::BinToHex((unsigned char*)block,8) << std::endl; #endif readbytes += blen; @@ -806,12 +798,12 @@ continue_packet: else extralen = getRsItemSize(block) - blen; // old style packet type -#ifdef DEBUG_PQISTREAMER +#ifdef DEBUG_PACKET_SLICING std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet getRsItemSize(block) = " << getRsItemSize(block) << std::endl ; std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet extralen = " << extralen << std::endl ; std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet state=" << mReading_state << std::endl ; - std::cerr << "[" << (void*)pthread_self() << "] " << "block 1 : " << RsUtil::BinToHex(block,8) << std::endl; + std::cerr << "[" << (void*)pthread_self() << "] " << "block 1 : " << RsUtil::BinToHex((unsigned char*)block,8) << std::endl; #endif if (extralen + (uint32_t)blen > maxlen) { @@ -874,7 +866,7 @@ continue_packet: if (extralen != (uint32_t)(tmplen = mBio->readdata(extradata, extralen))) { -#ifdef DEBUG_PQISTREAMER +#ifdef DEBUG_PACKET_SLICING if(tmplen > 0) std::cerr << "[" << (void*)pthread_self() << "] " << "Incomplete packet read ! This is a real problem ;-)" << std::endl ; #endif @@ -921,9 +913,9 @@ continue_packet: // we assume readdata() returned either -1 or the complete read size. } } -#ifdef DEBUG_PQISTREAMER +#ifdef DEBUG_PACKET_SLICING std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet state=" << mReading_state << std::endl ; - std::cerr << "[" << (void*)pthread_self() << "] " << "block 2 : " << RsUtil::BinToHex(extradata,8) << std::endl; + std::cerr << "[" << (void*)pthread_self() << "] " << "block 2 : " << RsUtil::BinToHex((unsigned char*)extradata,8) << std::endl; #endif mFailed_read_attempts = 0 ; @@ -952,7 +944,7 @@ continue_packet: std::cerr << "Inputing partial packet " << RsUtil::BinToHex((char*)block,8) << std::endl; #endif uint32_t packet_length = 0 ; - pkt = addPartialPacket(block,pktlen,slice_packet_id,is_packet_starting,is_packet_ending,packet_length) ; + pkt = addPartialPacket_locked(block,pktlen,slice_packet_id,is_packet_starting,is_packet_ending,packet_length) ; pktlen = packet_length ; } @@ -997,7 +989,7 @@ continue_packet: return 0; } -RsItem *pqistreamer::addPartialPacket(const void *block, uint32_t len, uint32_t slice_packet_id, bool is_packet_starting, bool is_packet_ending, uint32_t &total_len) +RsItem *pqistreamer::addPartialPacket_locked(const void *block, uint32_t len, uint32_t slice_packet_id, bool is_packet_starting, bool is_packet_ending, uint32_t &total_len) { #ifdef DEBUG_PACKET_SLICING std::cerr << "Receiving partial packet. size=" << len << ", ID=" << std::hex << slice_packet_id << std::dec << ", starting:" << is_packet_starting << ", ending:" << is_packet_ending ; @@ -1180,7 +1172,7 @@ void pqistreamer::outSentBytes_locked(uint32_t outb) #ifdef DEBUG_PQISTREAMER { std::string out; - rs_sprintf(out, "pqistreamer::outSentBytes(): %d@%gkB/s", outb, getRate(false)); + rs_sprintf(out, "pqistreamer::outSentBytes(): %d@%gkB/s", outb, RateInterface::getRate(false)); pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out); } #endif @@ -1210,7 +1202,7 @@ void pqistreamer::inReadBytes_locked(uint32_t inb) #ifdef DEBUG_PQISTREAMER { std::string out; - rs_sprintf(out, "pqistreamer::inReadBytes(): %d@%gkB/s", inb, getRate(true)); + rs_sprintf(out, "pqistreamer::inReadBytes(): %d@%gkB/s", inb, RateInterface::getRate(true)); pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out); } #endif @@ -1237,14 +1229,53 @@ void pqistreamer::allocate_rpend_locked() memset(mPkt_rpending,0,mPkt_rpend_size) ; } -void pqistreamer::free_rpend_locked() -{ - if(!mPkt_rpending) - return; +// clean everything that is half-finished, to avoid causing issues when re-connecting later on. - free(mPkt_rpending); - mPkt_rpending = 0; - mPkt_rpend_size = 0; +int pqistreamer::reset() +{ + RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/ +#ifdef DEBUG_PQISTREAMER + std::cerr << "pqistreamer::reset()" << std::endl; +#endif + free_pend_locked(); + + return 1 ; +} + +void pqistreamer::free_pend_locked() +{ + if(mPkt_rpending) + { +#ifdef DEBUG_PQISTREAMER + std::cerr << "pqistreamer::free_pend_locked(): pending input packet buffer" << std::endl; +#endif + free(mPkt_rpending); + mPkt_rpending = 0; + } + mPkt_rpend_size = 0; + + if (mPkt_wpending) + { +#ifdef DEBUG_PQISTREAMER + std::cerr << "pqistreamer::free_pend_locked(): pending output packet buffer" << std::endl; +#endif + free(mPkt_wpending); + mPkt_wpending = NULL; + } + mPkt_wpending_size = 0 ; + +#ifdef DEBUG_PQISTREAMER + if(!mPartialPackets.empty()) + std::cerr << "pqistreamer::free_pend_locked(): " << mPartialPackets.size() << " pending input partial packets" << std::endl; +#endif + // also delete any incoming partial packet + for(std::map::iterator it(mPartialPackets.begin());it!=mPartialPackets.end();++it) + free(it->second.mem) ; + + mPartialPackets.clear() ; + + // clean up outgoing. (cntrl packets) + locked_clear_out_queue() ; } int pqistreamer::gatherStatistics(std::list& outqueue_lst,std::list& inqueue_lst) diff --git a/libretroshare/src/pqi/pqistreamer.h b/libretroshare/src/pqi/pqistreamer.h index 542888e17..52a376062 100644 --- a/libretroshare/src/pqi/pqistreamer.h +++ b/libretroshare/src/pqi/pqistreamer.h @@ -74,6 +74,7 @@ class pqistreamer: public PQInterface virtual float getRate(bool b) ; protected: + virtual int reset() ; int tick_bio(); int tick_send(uint32_t timeout); @@ -117,7 +118,8 @@ class pqistreamer: public PQInterface int inAllowedBytes_locked(); void inReadBytes_locked(uint32_t ); - + // cleans up everything that's pending / half finished. + void free_pend_locked(); // RsSerialiser - determines which packets can be serialised. RsSerialiser *mRsSerialiser; @@ -126,7 +128,7 @@ class pqistreamer: public PQInterface uint32_t mPkt_wpending_size; // ... and its size. void allocate_rpend_locked(); // use these two functions to allocate/free the buffer below - void free_rpend_locked(); + int mPkt_rpend_size; // size of pkt_rpending. void *mPkt_rpending; // storage for read in pending packets. @@ -170,7 +172,7 @@ class pqistreamer: public PQInterface bool mAcceptsPacketSlicing ; time_t mLastSentPacketSlicingProbe ; void locked_addTrafficClue(const RsItem *pqi, uint32_t pktsize, std::list &lst); - RsItem *addPartialPacket(const void *block, uint32_t len, uint32_t slice_packet_id,bool packet_starting,bool packet_ending,uint32_t& total_len); + RsItem *addPartialPacket_locked(const void *block, uint32_t len, uint32_t slice_packet_id,bool packet_starting,bool packet_ending,uint32_t& total_len); std::map mPartialPackets ; };