removed debug info in pqistreamer

This commit is contained in:
csoler 2016-04-26 21:23:19 -04:00
parent 153db5ca64
commit 87764abe4c

View File

@ -61,6 +61,7 @@ static uint8_t PACKET_SLICING_PROBE_BYTES[8] = { 0x02, 0xaa, 0xbb, 0xcc, 0x00,
#define RSITEM_DEBUG 1 #define RSITEM_DEBUG 1
#define DEBUG_TRANSFERS 1 #define DEBUG_TRANSFERS 1
#define DEBUG_PQISTREAMER 1 #define DEBUG_PQISTREAMER 1
#define DEBUG_PACKET_SLICING 1
***/ ***/
#ifdef DEBUG_TRANSFERS #ifdef DEBUG_TRANSFERS
@ -112,8 +113,9 @@ pqistreamer::pqistreamer(RsSerialiser *rss, const RsPeerId& id, BinInterface *bi
pqistreamer::~pqistreamer() pqistreamer::~pqistreamer()
{ {
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/ RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
#ifdef DEBUG_PQISTREAMER
std::cerr << "Closing pqistreamer." << std::endl; std::cerr << "Closing pqistreamer." << std::endl;
#endif
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::~pqistreamer() Destruction!"); pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::~pqistreamer() Destruction!");
if (mBio_flags & BIN_FLAGS_NO_CLOSE) if (mBio_flags & BIN_FLAGS_NO_CLOSE)
@ -467,8 +469,9 @@ int pqistreamer::handleoutgoing_locked()
{ {
/* if we are not active - clear anything in the queues. */ /* if we are not active - clear anything in the queues. */
locked_clear_out_queue() ; locked_clear_out_queue() ;
#ifdef DEBUG_PACKET_SLICING
std::cerr << "(II) Switching off packet slicing." << std::endl; std::cerr << "(II) Switching off packet slicing." << std::endl;
#endif
mAcceptsPacketSlicing = false ; mAcceptsPacketSlicing = false ;
/* also remove the pending packets */ /* also remove the pending packets */
@ -493,12 +496,12 @@ int pqistreamer::handleoutgoing_locked()
if ((!(mBio->cansend(0))) || (maxbytes < sentbytes)) if ((!(mBio->cansend(0))) || (maxbytes < sentbytes))
{ {
//#ifdef DEBUG_PQISTREAMER #ifdef DEBUG_PACKET_SLICING
if (maxbytes < sentbytes) if (maxbytes < sentbytes)
std::cerr << "pqistreamer::handleoutgoing_locked() Stopped sending: bio not ready. maxbytes=" << maxbytes << ", sentbytes=" << sentbytes << std::endl; std::cerr << "pqistreamer::handleoutgoing_locked() Stopped sending: bio not ready. maxbytes=" << maxbytes << ", sentbytes=" << sentbytes << std::endl;
else else
std::cerr << "pqistreamer::handleoutgoing_locked() Stopped sending: sentbytes=" << sentbytes << ", max=" << maxbytes << std::endl; std::cerr << "pqistreamer::handleoutgoing_locked() Stopped sending: sentbytes=" << sentbytes << ", max=" << maxbytes << std::endl;
//#endif #endif
return 0; return 0;
} }
@ -518,7 +521,9 @@ int pqistreamer::handleoutgoing_locked()
if((!mAcceptsPacketSlicing) && now > mLastSentPacketSlicingProbe + PQISTREAM_PACKET_SLICING_PROBE_DELAY) if((!mAcceptsPacketSlicing) && now > mLastSentPacketSlicingProbe + PQISTREAM_PACKET_SLICING_PROBE_DELAY)
{ {
#ifdef DEBUG_PACKET_SLICING
std::cerr << "(II) Inserting packet slicing probe in traffic" << std::endl; std::cerr << "(II) Inserting packet slicing probe in traffic" << std::endl;
#endif
mPkt_wpending_size = 8 ; mPkt_wpending_size = 8 ;
mPkt_wpending = rs_malloc(8) ; mPkt_wpending = rs_malloc(8) ;
@ -550,7 +555,9 @@ int pqistreamer::handleoutgoing_locked()
if(slice_starts && slice_ends) // good old method. Send the packet as is, since it's a full packet. if(slice_starts && slice_ends) // good old method. Send the packet as is, since it's a full packet.
{ {
std::cerr << "sending full slice, old style" << std::endl; #ifdef DEBUG_PACKET_SLICING
std::cerr << "sending full slice, old style. Size=" << slice_size << std::endl;
#endif
mPkt_wpending = realloc(mPkt_wpending,slice_size+mPkt_wpending_size) ; mPkt_wpending = realloc(mPkt_wpending,slice_size+mPkt_wpending_size) ;
memcpy( &((char*)mPkt_wpending)[mPkt_wpending_size],dta,slice_size) ; memcpy( &((char*)mPkt_wpending)[mPkt_wpending_size],dta,slice_size) ;
free(dta); free(dta);
@ -559,7 +566,9 @@ int pqistreamer::handleoutgoing_locked()
} }
else // partial packet. We make a special header for it and insert it in the stream else // partial packet. We make a special header for it and insert it in the stream
{ {
#ifdef DEBUG_PACKET_SLICING
std::cerr << "sending partial slice, packet ID=" << std::hex << slice_packet_id << std::dec << ", size=" << slice_size << std::endl; std::cerr << "sending partial slice, packet ID=" << std::hex << slice_packet_id << std::dec << ", size=" << slice_size << std::endl;
#endif
mPkt_wpending = realloc(mPkt_wpending,slice_size+mPkt_wpending_size+PQISTREAM_PARTIAL_PACKET_HEADER_SIZE) ; mPkt_wpending = realloc(mPkt_wpending,slice_size+mPkt_wpending_size+PQISTREAM_PARTIAL_PACKET_HEADER_SIZE) ;
memcpy( &((char*)mPkt_wpending)[mPkt_wpending_size+PQISTREAM_PARTIAL_PACKET_HEADER_SIZE],dta,slice_size) ; memcpy( &((char*)mPkt_wpending)[mPkt_wpending_size+PQISTREAM_PARTIAL_PACKET_HEADER_SIZE],dta,slice_size) ;
@ -745,7 +754,9 @@ start_packet_read:
if(!memcmp(block,PACKET_SLICING_PROBE_BYTES,8)) if(!memcmp(block,PACKET_SLICING_PROBE_BYTES,8))
{ {
mAcceptsPacketSlicing = true ; mAcceptsPacketSlicing = true ;
#ifdef DEBUG_PACKET_SLICING
std::cerr << "(II) Enabling packet slicing!" << std::endl; std::cerr << "(II) Enabling packet slicing!" << std::endl;
#endif
} }
} }
continue_packet: continue_packet:
@ -765,7 +776,9 @@ continue_packet:
extralen = (uint32_t(((uint8_t*)block)[6]) << 8 ) + (uint32_t(((uint8_t*)block)[7])); extralen = (uint32_t(((uint8_t*)block)[6]) << 8 ) + (uint32_t(((uint8_t*)block)[7]));
slice_packet_id = (uint32_t(((uint8_t*)block)[2]) << 24) + (uint32_t(((uint8_t*)block)[3]) << 16) + (uint32_t(((uint8_t*)block)[4]) << 8) + (uint32_t(((uint8_t*)block)[5]) << 0); slice_packet_id = (uint32_t(((uint8_t*)block)[2]) << 24) + (uint32_t(((uint8_t*)block)[3]) << 16) + (uint32_t(((uint8_t*)block)[4]) << 8) + (uint32_t(((uint8_t*)block)[5]) << 0);
#ifdef DEBUG_PACKET_SLICING
std::cerr << "Reading partial packet from mem block " << RsUtil::BinToHex((char*)block,8) << ": packet_id=" << std::hex << slice_packet_id << std::dec << ", len=" << extralen << std::endl; std::cerr << "Reading partial packet from mem block " << RsUtil::BinToHex((char*)block,8) << ": packet_id=" << std::hex << slice_packet_id << std::dec << ", len=" << extralen << std::endl;
#endif
is_partial_packet = true ; is_partial_packet = true ;
} }
else else
@ -905,19 +918,17 @@ continue_packet:
} }
#endif #endif
// std::cerr << "Deserializing packet of size " << pktlen <<std::endl ;
uint32_t pktlen = blen+extralen ; uint32_t pktlen = blen+extralen ;
#ifdef DEBUG_PQISTREAMER #ifdef DEBUG_PQISTREAMER
std::cerr << "[" << (void*)pthread_self() << "] " << "deserializing. Size=" << pktlen << std::endl ; std::cerr << "[" << (void*)pthread_self() << "] " << RsUtil::BinToHex((char*)block,8) << "...: deserializing. Size=" << pktlen << std::endl ;
#endif #endif
RsItem *pkt ; RsItem *pkt ;
std::cerr << "Got packet with header " << RsUtil::BinToHex((char*)block,8) << std::endl;
if(is_partial_packet) if(is_partial_packet)
{ {
#ifdef DEBUG_PACKET_SLICING
std::cerr << "Inputing partial packet " << RsUtil::BinToHex((char*)block,8) << std::endl; std::cerr << "Inputing partial packet " << RsUtil::BinToHex((char*)block,8) << std::endl;
#endif
pkt = addPartialPacket(block,pktlen,slice_packet_id,is_packet_starting,is_packet_ending) ; pkt = addPartialPacket(block,pktlen,slice_packet_id,is_packet_starting,is_packet_ending) ;
} }
else else
@ -963,7 +974,9 @@ continue_packet:
RsItem *pqistreamer::addPartialPacket(const void *block,uint32_t len,uint32_t slice_packet_id,bool is_packet_starting,bool is_packet_ending) RsItem *pqistreamer::addPartialPacket(const void *block,uint32_t len,uint32_t slice_packet_id,bool is_packet_starting,bool is_packet_ending)
{ {
#ifdef DEBUG_PACKET_SLICING
std::cerr << "Receiving partial packet. size=" << len << ", ID=" << std::hex << slice_packet_id << std::dec << ", starting:" << is_packet_starting << ", ending:" << is_packet_ending ; std::cerr << "Receiving partial packet. size=" << len << ", ID=" << std::hex << slice_packet_id << std::dec << ", starting:" << is_packet_starting << ", ending:" << is_packet_ending ;
#endif
if(is_packet_starting && is_packet_ending) if(is_packet_starting && is_packet_ending)
{ {
@ -998,7 +1011,9 @@ RsItem *pqistreamer::addPartialPacket(const void *block,uint32_t len,uint32_t sl
memcpy(rec.mem, slice_data, slice_length) ; ; memcpy(rec.mem, slice_data, slice_length) ; ;
rec.size = slice_length ; rec.size = slice_length ;
#ifdef DEBUG_PACKET_SLICING
std::cerr << " => stored in new record (size=" << rec.size << std::endl; std::cerr << " => stored in new record (size=" << rec.size << std::endl;
#endif
return NULL ; // no need to check for ending return NULL ; // no need to check for ending
} }
@ -1018,11 +1033,15 @@ RsItem *pqistreamer::addPartialPacket(const void *block,uint32_t len,uint32_t sl
memcpy( &((char*)rec.mem)[rec.size],slice_data,slice_length) ; memcpy( &((char*)rec.mem)[rec.size],slice_data,slice_length) ;
rec.size += slice_length ; rec.size += slice_length ;
#ifdef DEBUG_PACKET_SLICING
std::cerr << " => added to existing record size=" << rec.size ; std::cerr << " => added to existing record size=" << rec.size ;
#endif
if(is_packet_ending) if(is_packet_ending)
{ {
#ifdef DEBUG_PACKET_SLICING
std::cerr << " => deserialising: mem=" << RsUtil::BinToHex((char*)rec.mem,std::min(8u,rec.size)) << std::endl; std::cerr << " => deserialising: mem=" << RsUtil::BinToHex((char*)rec.mem,std::min(8u,rec.size)) << std::endl;
#endif
RsItem *item = mRsSerialiser->deserialise(rec.mem, &rec.size); RsItem *item = mRsSerialiser->deserialise(rec.mem, &rec.size);
free(rec.mem) ; free(rec.mem) ;
@ -1031,7 +1050,9 @@ RsItem *pqistreamer::addPartialPacket(const void *block,uint32_t len,uint32_t sl
} }
else else
{ {
#ifdef DEBUG_PACKET_SLICING
std::cerr << std::endl; std::cerr << std::endl;
#endif
return NULL ; return NULL ;
} }
} }