added auto-clean of pending data in pqistreamer after connection is closed, to avoid confusing the peer after re-connecting (probably not the cause of the SSL errors we see)

This commit is contained in:
csoler 2016-06-25 19:12:35 +01:00
parent 043c5b1f22
commit 31089d7a91
5 changed files with 78 additions and 37 deletions

View File

@ -63,8 +63,8 @@ public:
virtual int connect(const struct sockaddr_storage &raddr) { return ni->connect(raddr); } virtual int connect(const struct sockaddr_storage &raddr) { return ni->connect(raddr); }
virtual int listen() { return ni->listen(); } virtual int listen() { return ni->listen(); }
virtual int stoplistening() { return ni->stoplistening(); } virtual int stoplistening() { return ni->stoplistening(); }
virtual int reset() { return ni->reset(); } virtual int reset() { pqistreamer::reset(); return ni->reset(); }
virtual int disconnect() { return ni->reset(); } virtual int disconnect() { return reset() ; }
virtual bool connect_parameter(uint32_t type, uint32_t value) { return ni->connect_parameter(type, value);} virtual bool connect_parameter(uint32_t type, uint32_t value) { return ni->connect_parameter(type, value);}
virtual bool connect_parameter(uint32_t type, std::string value) { return ni->connect_parameter(type, value);} virtual bool connect_parameter(uint32_t type, std::string value) { return ni->connect_parameter(type, value);}
virtual bool connect_additional_address(uint32_t type, const struct sockaddr_storage &addr) { return ni->connect_additional_address(type, addr); } virtual bool connect_additional_address(uint32_t type, const struct sockaddr_storage &addr) { return ni->connect_additional_address(type, addr); }

View File

@ -25,6 +25,8 @@
#include "pqiqosstreamer.h" #include "pqiqosstreamer.h"
//#define DEBUG_PQIQOSSTREAMER 1
const float pqiQoSstreamer::PQI_QOS_STREAMER_ALPHA = 2.0f ; const float pqiQoSstreamer::PQI_QOS_STREAMER_ALPHA = 2.0f ;
pqiQoSstreamer::pqiQoSstreamer(PQInterface *parent, RsSerialiser *rss, const RsPeerId& peerid, BinInterface *bio_in, int bio_flagsin) pqiQoSstreamer::pqiQoSstreamer(PQInterface *parent, RsSerialiser *rss, const RsPeerId& peerid, BinInterface *bio_in, int bio_flagsin)
@ -60,6 +62,11 @@ void pqiQoSstreamer::locked_storeInOutputQueue(void *ptr,int size,int priority)
void pqiQoSstreamer::locked_clear_out_queue() void pqiQoSstreamer::locked_clear_out_queue()
{ {
#ifdef DEBUG_PQIQOSSTREAMER
if(qos_queue_size() > 0)
std::cerr << " pqiQoSstreamer::locked_clear_out_queue(): clearing " << qos_queue_size() << " pending outqueue elements." << std::endl;
#endif
pqiQoS::clear() ; pqiQoS::clear() ;
_total_item_size = 0 ; _total_item_size = 0 ;
_total_item_count = 0 ; _total_item_count = 0 ;

View File

@ -1661,8 +1661,6 @@ int pqissl::readdata(void *data, int len)
int error = SSL_get_error(ssl_connection, tmppktlen); int error = SSL_get_error(ssl_connection, tmppktlen);
unsigned long err2 = ERR_get_error(); unsigned long err2 = ERR_get_error();
//printSSLError(ssl_connection, tmppktlen, error, err2, out);
if ((error == SSL_ERROR_ZERO_RETURN) && (err2 == 0)) if ((error == SSL_ERROR_ZERO_RETURN) && (err2 == 0))
{ {
/* this code will be called when /* this code will be called when
@ -1761,7 +1759,10 @@ int pqissl::readdata(void *data, int len)
rs_sprintf_append(out, "SSL_read() UNKNOWN ERROR: %d Resetting!", error); rs_sprintf_append(out, "SSL_read() UNKNOWN ERROR: %d Resetting!", error);
rslog(RSL_ALERT, pqisslzone, out); rslog(RSL_ALERT, pqisslzone, out);
std::cerr << out << std::endl ; std::cerr << out << std::endl ;
std::cerr << ", SSL_read() output is " << tmppktlen << std::endl ;
printSSLError(ssl_connection, tmppktlen, error, err2, out);
rslog(RSL_ALERT, pqisslzone, "pqissl::readdata() -> calling reset()"); rslog(RSL_ALERT, pqisslzone, "pqissl::readdata() -> calling reset()");
reset_locked(); reset_locked();
return -1; return -1;

View File

@ -135,17 +135,7 @@ pqistreamer::~pqistreamer()
if (mRsSerialiser) if (mRsSerialiser)
delete mRsSerialiser; delete mRsSerialiser;
// clean up outgoing. (cntrl packets) free_pend_locked();
locked_clear_out_queue() ;
if (mPkt_wpending)
{
free(mPkt_wpending);
mPkt_wpending = NULL;
mPkt_wpending_size = 0 ;
}
free_rpend_locked();
// clean up incoming. // clean up incoming.
while(!mIncoming.empty()) while(!mIncoming.empty())
@ -156,6 +146,7 @@ pqistreamer::~pqistreamer()
delete i; delete i;
} }
if(mIncomingSize != 0) if(mIncomingSize != 0)
std::cerr << "(EE) inconsistency after deleting pqistreamer queue. Remaining items: " << mIncomingSize << std::endl; std::cerr << "(EE) inconsistency after deleting pqistreamer queue. Remaining items: " << mIncomingSize << std::endl;
return; return;
@ -282,7 +273,7 @@ int pqistreamer::tick_recv(uint32_t timeout)
} }
if(!(mBio->isactive())) if(!(mBio->isactive()))
{ {
free_rpend_locked(); free_pend_locked();
} }
return 1; return 1;
} }
@ -295,6 +286,7 @@ int pqistreamer::tick_send(uint32_t timeout)
/* short circuit everything is bio isn't active */ /* short circuit everything is bio isn't active */
if (!(mBio->isactive())) if (!(mBio->isactive()))
{ {
free_pend_locked();
return 0; return 0;
} }
@ -690,7 +682,7 @@ int pqistreamer::handleincoming_locked()
if(!(mBio->isactive())) if(!(mBio->isactive()))
{ {
mReading_state = reading_state_initial ; mReading_state = reading_state_initial ;
free_rpend_locked(); free_pend_locked();
return 0; return 0;
} }
else else
@ -762,7 +754,7 @@ start_packet_read:
} }
} }
#ifdef DEBUG_PQISTREAMER #ifdef DEBUG_PQISTREAMER
std::cerr << "[" << (void*)pthread_self() << "] " << "block 0 : " << RsUtil::BinToHex(block,8) << std::endl; std::cerr << "[" << (void*)pthread_self() << "] " << "block 0 : " << RsUtil::BinToHex((unsigned char*)block,8) << std::endl;
#endif #endif
readbytes += blen; readbytes += blen;
@ -806,12 +798,12 @@ continue_packet:
else else
extralen = getRsItemSize(block) - blen; // old style packet type extralen = getRsItemSize(block) - blen; // old style packet type
#ifdef DEBUG_PQISTREAMER #ifdef DEBUG_PACKET_SLICING
std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet getRsItemSize(block) = " << getRsItemSize(block) << std::endl ; std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet getRsItemSize(block) = " << getRsItemSize(block) << std::endl ;
std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet extralen = " << extralen << std::endl ; std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet extralen = " << extralen << std::endl ;
std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet state=" << mReading_state << std::endl ; std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet state=" << mReading_state << std::endl ;
std::cerr << "[" << (void*)pthread_self() << "] " << "block 1 : " << RsUtil::BinToHex(block,8) << std::endl; std::cerr << "[" << (void*)pthread_self() << "] " << "block 1 : " << RsUtil::BinToHex((unsigned char*)block,8) << std::endl;
#endif #endif
if (extralen + (uint32_t)blen > maxlen) if (extralen + (uint32_t)blen > maxlen)
{ {
@ -874,7 +866,7 @@ continue_packet:
if (extralen != (uint32_t)(tmplen = mBio->readdata(extradata, extralen))) if (extralen != (uint32_t)(tmplen = mBio->readdata(extradata, extralen)))
{ {
#ifdef DEBUG_PQISTREAMER #ifdef DEBUG_PACKET_SLICING
if(tmplen > 0) if(tmplen > 0)
std::cerr << "[" << (void*)pthread_self() << "] " << "Incomplete packet read ! This is a real problem ;-)" << std::endl ; std::cerr << "[" << (void*)pthread_self() << "] " << "Incomplete packet read ! This is a real problem ;-)" << std::endl ;
#endif #endif
@ -921,9 +913,9 @@ continue_packet:
// we assume readdata() returned either -1 or the complete read size. // we assume readdata() returned either -1 or the complete read size.
} }
} }
#ifdef DEBUG_PQISTREAMER #ifdef DEBUG_PACKET_SLICING
std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet state=" << mReading_state << std::endl ; std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet state=" << mReading_state << std::endl ;
std::cerr << "[" << (void*)pthread_self() << "] " << "block 2 : " << RsUtil::BinToHex(extradata,8) << std::endl; std::cerr << "[" << (void*)pthread_self() << "] " << "block 2 : " << RsUtil::BinToHex((unsigned char*)extradata,8) << std::endl;
#endif #endif
mFailed_read_attempts = 0 ; mFailed_read_attempts = 0 ;
@ -952,7 +944,7 @@ continue_packet:
std::cerr << "Inputing partial packet " << RsUtil::BinToHex((char*)block,8) << std::endl; std::cerr << "Inputing partial packet " << RsUtil::BinToHex((char*)block,8) << std::endl;
#endif #endif
uint32_t packet_length = 0 ; uint32_t packet_length = 0 ;
pkt = addPartialPacket(block,pktlen,slice_packet_id,is_packet_starting,is_packet_ending,packet_length) ; pkt = addPartialPacket_locked(block,pktlen,slice_packet_id,is_packet_starting,is_packet_ending,packet_length) ;
pktlen = packet_length ; pktlen = packet_length ;
} }
@ -997,7 +989,7 @@ continue_packet:
return 0; return 0;
} }
RsItem *pqistreamer::addPartialPacket(const void *block, uint32_t len, uint32_t slice_packet_id, bool is_packet_starting, bool is_packet_ending, uint32_t &total_len) RsItem *pqistreamer::addPartialPacket_locked(const void *block, uint32_t len, uint32_t slice_packet_id, bool is_packet_starting, bool is_packet_ending, uint32_t &total_len)
{ {
#ifdef DEBUG_PACKET_SLICING #ifdef DEBUG_PACKET_SLICING
std::cerr << "Receiving partial packet. size=" << len << ", ID=" << std::hex << slice_packet_id << std::dec << ", starting:" << is_packet_starting << ", ending:" << is_packet_ending ; std::cerr << "Receiving partial packet. size=" << len << ", ID=" << std::hex << slice_packet_id << std::dec << ", starting:" << is_packet_starting << ", ending:" << is_packet_ending ;
@ -1180,7 +1172,7 @@ void pqistreamer::outSentBytes_locked(uint32_t outb)
#ifdef DEBUG_PQISTREAMER #ifdef DEBUG_PQISTREAMER
{ {
std::string out; std::string out;
rs_sprintf(out, "pqistreamer::outSentBytes(): %d@%gkB/s", outb, getRate(false)); rs_sprintf(out, "pqistreamer::outSentBytes(): %d@%gkB/s", outb, RateInterface::getRate(false));
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out); pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out);
} }
#endif #endif
@ -1210,7 +1202,7 @@ void pqistreamer::inReadBytes_locked(uint32_t inb)
#ifdef DEBUG_PQISTREAMER #ifdef DEBUG_PQISTREAMER
{ {
std::string out; std::string out;
rs_sprintf(out, "pqistreamer::inReadBytes(): %d@%gkB/s", inb, getRate(true)); rs_sprintf(out, "pqistreamer::inReadBytes(): %d@%gkB/s", inb, RateInterface::getRate(true));
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out); pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out);
} }
#endif #endif
@ -1237,14 +1229,53 @@ void pqistreamer::allocate_rpend_locked()
memset(mPkt_rpending,0,mPkt_rpend_size) ; memset(mPkt_rpending,0,mPkt_rpend_size) ;
} }
void pqistreamer::free_rpend_locked() // clean everything that is half-finished, to avoid causing issues when re-connecting later on.
{
if(!mPkt_rpending)
return;
free(mPkt_rpending); int pqistreamer::reset()
mPkt_rpending = 0; {
mPkt_rpend_size = 0; RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
#ifdef DEBUG_PQISTREAMER
std::cerr << "pqistreamer::reset()" << std::endl;
#endif
free_pend_locked();
return 1 ;
}
void pqistreamer::free_pend_locked()
{
if(mPkt_rpending)
{
#ifdef DEBUG_PQISTREAMER
std::cerr << "pqistreamer::free_pend_locked(): pending input packet buffer" << std::endl;
#endif
free(mPkt_rpending);
mPkt_rpending = 0;
}
mPkt_rpend_size = 0;
if (mPkt_wpending)
{
#ifdef DEBUG_PQISTREAMER
std::cerr << "pqistreamer::free_pend_locked(): pending output packet buffer" << std::endl;
#endif
free(mPkt_wpending);
mPkt_wpending = NULL;
}
mPkt_wpending_size = 0 ;
#ifdef DEBUG_PQISTREAMER
if(!mPartialPackets.empty())
std::cerr << "pqistreamer::free_pend_locked(): " << mPartialPackets.size() << " pending input partial packets" << std::endl;
#endif
// also delete any incoming partial packet
for(std::map<uint32_t,PartialPacketRecord>::iterator it(mPartialPackets.begin());it!=mPartialPackets.end();++it)
free(it->second.mem) ;
mPartialPackets.clear() ;
// clean up outgoing. (cntrl packets)
locked_clear_out_queue() ;
} }
int pqistreamer::gatherStatistics(std::list<RSTrafficClue>& outqueue_lst,std::list<RSTrafficClue>& inqueue_lst) int pqistreamer::gatherStatistics(std::list<RSTrafficClue>& outqueue_lst,std::list<RSTrafficClue>& inqueue_lst)

View File

@ -74,6 +74,7 @@ class pqistreamer: public PQInterface
virtual float getRate(bool b) ; virtual float getRate(bool b) ;
protected: protected:
virtual int reset() ;
int tick_bio(); int tick_bio();
int tick_send(uint32_t timeout); int tick_send(uint32_t timeout);
@ -117,7 +118,8 @@ class pqistreamer: public PQInterface
int inAllowedBytes_locked(); int inAllowedBytes_locked();
void inReadBytes_locked(uint32_t ); void inReadBytes_locked(uint32_t );
// cleans up everything that's pending / half finished.
void free_pend_locked();
// RsSerialiser - determines which packets can be serialised. // RsSerialiser - determines which packets can be serialised.
RsSerialiser *mRsSerialiser; RsSerialiser *mRsSerialiser;
@ -126,7 +128,7 @@ class pqistreamer: public PQInterface
uint32_t mPkt_wpending_size; // ... and its size. uint32_t mPkt_wpending_size; // ... and its size.
void allocate_rpend_locked(); // use these two functions to allocate/free the buffer below void allocate_rpend_locked(); // use these two functions to allocate/free the buffer below
void free_rpend_locked();
int mPkt_rpend_size; // size of pkt_rpending. int mPkt_rpend_size; // size of pkt_rpending.
void *mPkt_rpending; // storage for read in pending packets. void *mPkt_rpending; // storage for read in pending packets.
@ -170,7 +172,7 @@ class pqistreamer: public PQInterface
bool mAcceptsPacketSlicing ; bool mAcceptsPacketSlicing ;
time_t mLastSentPacketSlicingProbe ; time_t mLastSentPacketSlicingProbe ;
void locked_addTrafficClue(const RsItem *pqi, uint32_t pktsize, std::list<RSTrafficClue> &lst); void locked_addTrafficClue(const RsItem *pqi, uint32_t pktsize, std::list<RSTrafficClue> &lst);
RsItem *addPartialPacket(const void *block, uint32_t len, uint32_t slice_packet_id,bool packet_starting,bool packet_ending,uint32_t& total_len); RsItem *addPartialPacket_locked(const void *block, uint32_t len, uint32_t slice_packet_id,bool packet_starting,bool packet_ending,uint32_t& total_len);
std::map<uint32_t,PartialPacketRecord> mPartialPackets ; std::map<uint32_t,PartialPacketRecord> mPartialPackets ;
}; };