fixed a few bugs in packet slicing in pqiQoS

This commit is contained in:
csoler 2016-04-24 21:18:44 -04:00
parent a2c11f97c2
commit f62957534e
4 changed files with 173 additions and 153 deletions

View File

@ -6,7 +6,7 @@
#include "pqiqos.h" #include "pqiqos.h"
static const uint32_t MAX_COUNTER_VALUE = 1024u*1024u ; // 2^20 const uint32_t pqiQoS::MAX_PACKET_COUNTER_VALUE = (1 << 24) ;
pqiQoS::pqiQoS(uint32_t nb_levels,float alpha) pqiQoS::pqiQoS(uint32_t nb_levels,float alpha)
: _item_queues(nb_levels),_alpha(alpha) : _item_queues(nb_levels),_alpha(alpha)
@ -16,6 +16,7 @@ pqiQoS::pqiQoS(uint32_t nb_levels,float alpha)
float c = 1.0f ; float c = 1.0f ;
float inc = alpha ; float inc = alpha ;
_nb_items = 0 ; _nb_items = 0 ;
_id_counter = 0 ;
for(int i=((int)nb_levels)-1;i>=0;--i,c *= alpha) for(int i=((int)nb_levels)-1;i>=0;--i,c *= alpha)
{ {
@ -57,7 +58,7 @@ void pqiQoS::in_rsItem(void *ptr,int size,int priority)
_item_queues[priority].push(ptr,size,_id_counter++) ; _item_queues[priority].push(ptr,size,_id_counter++) ;
++_nb_items ; ++_nb_items ;
if(_id_counter >= MAX_COUNTER_VALUE) if(_id_counter >= MAX_PACKET_COUNTER_VALUE)
_id_counter = 0 ; _id_counter = 0 ;
} }

View File

@ -45,7 +45,7 @@
class pqiQoS class pqiQoS
{ {
public: public:
pqiQoS(uint32_t max_levels,float alpha) ; pqiQoS(uint32_t max_levels,float alpha) ;
struct ItemRecord struct ItemRecord
@ -95,9 +95,9 @@ class pqiQoS
return pop() ; return pop() ;
} }
starts = (rec.current_offset == 0) ; starts = (rec.current_offset == 0) ;
ends = (rec.current_offset + max_size > rec.size) ; ends = (rec.current_offset + max_size >= rec.size) ;
if(rec.size < rec.current_offset) if(rec.size <= rec.current_offset)
{ {
std::cerr << "(EE) severe error in slicing in QoS." << std::endl; std::cerr << "(EE) severe error in slicing in QoS." << std::endl;
pop() ; pop() ;
@ -122,7 +122,7 @@ class pqiQoS
_items.pop_front() ; _items.pop_front() ;
} }
else else
rec.current_offset += size ; rec.current_offset += size ; // by construction, !ends implies rec.current_offset < rec.size
return mem ; return mem ;
} }
@ -137,7 +137,6 @@ class pqiQoS
rec.id = id ; rec.id = id ;
_items.push_back(rec) ; _items.push_back(rec) ;
++_item_count ;
} }
uint32_t size() const { return _item_count ; } uint32_t size() const { return _item_count ; }
@ -172,14 +171,15 @@ class pqiQoS
void computeTotalItemSize() const ; void computeTotalItemSize() const ;
int debug_computeTotalItemSize() const ; int debug_computeTotalItemSize() const ;
private: private:
// This vector stores the lists of items with equal priorities. // This vector stores the lists of items with equal priorities.
// //
std::vector<ItemQueue> _item_queues ; std::vector<ItemQueue> _item_queues ;
float _alpha ; float _alpha ;
uint64_t _nb_items ; uint64_t _nb_items ;
uint32_t _id_counter ; uint32_t _id_counter ;
static const uint32_t MAX_PACKET_COUNTER_VALUE ;
}; };

View File

@ -45,8 +45,8 @@ static const float PQISTREAM_AVG_FRAC = 0.8; // for bandpass filter over
static const int PQISTREAM_OPTIMAL_PACKET_SIZE = 512; // It is believed that this value should be lower than TCP slices and large enough as compare to encryption padding. static const int PQISTREAM_OPTIMAL_PACKET_SIZE = 512; // It is believed that this value should be lower than TCP slices and large enough as compare to encryption padding.
// most importantly, it should be constant, so as to allow correct QoS. // most importantly, it should be constant, so as to allow correct QoS.
static const int PQISTREAM_OPTIMAL_SLICE_OFFSET_UNIT = 16 ; // slices offset in units of 16 bits. That allows bigger numbers encoded in 4 less bits. static const int PQISTREAM_OPTIMAL_SLICE_OFFSET_UNIT = 16 ; // slices offset in units of 16 bits. That allows bigger numbers encoded in 4 less bits.
static const int PQISTREAM_SLICE_FLAG_ENDS = 0x01; // these flags should be kept in the range 0x01-0x08 static const int PQISTREAM_SLICE_FLAG_STARTS = 0x01; //
static const int PQISTREAM_SLICE_FLAG_STARTS = 0x02; // static const int PQISTREAM_SLICE_FLAG_ENDS = 0x02; // these flags should be kept in the range 0x01-0x08
static const int PQISTREAM_SLICE_PROTOCOL_VERSION_ID = 0x10; // Protocol version ID. Should hold on the 4 lower bits. static const int PQISTREAM_SLICE_PROTOCOL_VERSION_ID = 0x10; // Protocol version ID. Should hold on the 4 lower bits.
static const int PQISTREAM_PARTIAL_PACKET_HEADER_SIZE = 8; // Same size than normal header, to make the code simpler. static const int PQISTREAM_PARTIAL_PACKET_HEADER_SIZE = 8; // Same size than normal header, to make the code simpler.
@ -527,6 +527,7 @@ int pqistreamer::handleoutgoing_locked()
if(slice_starts && slice_ends) // good old method. Send the packet as is, since it's a full packet. if(slice_starts && slice_ends) // good old method. Send the packet as is, since it's a full packet.
{ {
std::cerr << "sending full slice, old style" << std::endl;
mPkt_wpending = realloc(mPkt_wpending,slice_size+mPkt_wpending_size) ; mPkt_wpending = realloc(mPkt_wpending,slice_size+mPkt_wpending_size) ;
memcpy( &((char*)mPkt_wpending)[mPkt_wpending_size],dta,slice_size) ; memcpy( &((char*)mPkt_wpending)[mPkt_wpending_size],dta,slice_size) ;
free(dta); free(dta);
@ -535,6 +536,8 @@ int pqistreamer::handleoutgoing_locked()
} }
else // partial packet. We make a special header for it and insert it in the stream else // partial packet. We make a special header for it and insert it in the stream
{ {
std::cerr << "sending partial slice, packet ID=" << std::hex << slice_packet_id << std::dec << ", offset=" << slice_offset << ", size=" << slice_size << std::endl;
mPkt_wpending = realloc(mPkt_wpending,slice_size+mPkt_wpending_size+PQISTREAM_PARTIAL_PACKET_HEADER_SIZE) ; mPkt_wpending = realloc(mPkt_wpending,slice_size+mPkt_wpending_size+PQISTREAM_PARTIAL_PACKET_HEADER_SIZE) ;
memcpy( &((char*)mPkt_wpending)[mPkt_wpending_size+PQISTREAM_PARTIAL_PACKET_HEADER_SIZE],dta,slice_size) ; memcpy( &((char*)mPkt_wpending)[mPkt_wpending_size+PQISTREAM_PARTIAL_PACKET_HEADER_SIZE],dta,slice_size) ;
free(dta); free(dta);
@ -723,16 +726,17 @@ continue_packet:
bool is_packet_starting = (((char*)block)[0] == 0x11) ; bool is_packet_starting = (((char*)block)[0] == 0x11) ;
bool is_packet_ending = (((char*)block)[0] == 0x12) ; bool is_packet_ending = (((char*)block)[0] == 0x12) ;
int extralen =0; uint32_t extralen =0;
int slice_offset = 0 ; uint32_t slice_offset = 0 ;
int slice_packet_id =0; uint32_t slice_packet_id =0;
if( ((char*)block)[0] == 0x10 || ((char*)block)[0] == 0x11 || ((char*)block)[0] == 0x12) if( ((char*)block)[0] == 0x10 || ((char*)block)[0] == 0x11 || ((char*)block)[0] == 0x12)
{ {
extralen = (int(((char*)block)[6]) << 8) + (int(((char*)block)[7])); extralen = (uint32_t(((uint8_t*)block)[6]) << 8) + (uint32_t(((uint8_t*)block)[7]));
slice_offset = (int(((char*)block)[5]) << 4) + (int(((char*)block)[4]) << 12); slice_offset = (uint32_t(((uint8_t*)block)[5]) << 4) + (uint32_t(((uint8_t*)block)[4]) << 12);
slice_packet_id = (int(((char*)block)[3]) << 0) + (int(((char*)block)[2]) << 8) + (int(((char*)block)[1]) << 16); slice_packet_id = (uint32_t(((uint8_t*)block)[3]) << 0) + (uint32_t(((uint8_t*)block)[2]) << 8) + (uint32_t(((uint8_t*)block)[1]) << 16);
std::cerr << "Reading from mem block " << RsUtil::BinToHex((char*)block,8) << ": packet_id=" << slice_packet_id << ", len=" << extralen << ", offset=" << slice_offset << std::endl;
is_partial_packet = true ; is_partial_packet = true ;
} }
else else
@ -902,19 +906,17 @@ continue_packet:
std::cerr << "[" << (void*)pthread_self() << "] " << "deserializing. Size=" << pktlen << std::endl ; std::cerr << "[" << (void*)pthread_self() << "] " << "deserializing. Size=" << pktlen << std::endl ;
#endif #endif
RsItem *pkt ; RsItem *pkt ;
std::cerr << "Got packet with header " << RsUtil::BinToHex((char*)block,8) << std::endl;
if(is_partial_packet) if(is_partial_packet)
{ {
std::cerr << "Inputing partial packet " << RsUtil::BinToHex((char*)block,8) << std::endl; std::cerr << "Inputing partial packet " << RsUtil::BinToHex((char*)block,8) << std::endl;
pkt = addPartialPacket(block,extralen,slice_offset,slice_packet_id,is_packet_starting,is_packet_ending) ; pkt = addPartialPacket(block,pktlen,slice_offset,slice_packet_id,is_packet_starting,is_packet_ending) ;
} }
else else
pkt = mRsSerialiser->deserialise(block, &pktlen); pkt = mRsSerialiser->deserialise(block, &pktlen);
std::cerr << "Got packet with header " << RsUtil::BinToHex((char*)block,8) << std::endl;
if ((pkt != NULL) && (0 < handleincomingitem_locked(pkt,pktlen))) if ((pkt != NULL) && (0 < handleincomingitem_locked(pkt,pktlen)))
{ {
#ifdef DEBUG_PQISTREAMER #ifdef DEBUG_PQISTREAMER
@ -953,15 +955,19 @@ continue_packet:
return 0; return 0;
} }
RsItem *pqistreamer::addPartialPacket(void *block,uint32_t len,uint32_t slice_offset,uint32_t slice_packet_id,bool is_packet_starting,bool is_packet_ending) RsItem *pqistreamer::addPartialPacket(const void *block,uint32_t len,uint32_t slice_offset,uint32_t slice_packet_id,bool is_packet_starting,bool is_packet_ending)
{ {
std::cerr << "Receiving partial packet. size=" << len << ", offset=" << slice_offset << ". ID=" << std::hex << slice_packet_id << std::dec << ", starting:" << is_packet_starting << ", ending:" << is_packet_ending ;
if(is_packet_starting && is_packet_ending) if(is_packet_starting && is_packet_ending)
{ {
std::cerr << "(EE) unexpected situation. Got in addPartialPacket() a full packet both starting and ending" << std::endl; std::cerr << "(EE) unexpected situation. Got in addPartialPacket() a full packet both starting and ending" << std::endl;
free(block) ;
return NULL ; return NULL ;
} }
uint32_t slice_length = len - PQISTREAM_PARTIAL_PACKET_HEADER_SIZE ;
unsigned char *slice_data = &((unsigned char*)block)[PQISTREAM_PARTIAL_PACKET_HEADER_SIZE] ;
std::map<uint32_t,PartialPacketRecord>::iterator it = mPartialPackets.find(slice_packet_id) ; std::map<uint32_t,PartialPacketRecord>::iterator it = mPartialPackets.find(slice_packet_id) ;
if(it == mPartialPackets.end()) if(it == mPartialPackets.end())
@ -971,45 +977,58 @@ RsItem *pqistreamer::addPartialPacket(void *block,uint32_t len,uint32_t slice_of
if(!is_packet_starting) if(!is_packet_starting)
{ {
std::cerr << "(EE) dropping non starting packet that has no record." << std::endl; std::cerr << "(EE) dropping non starting packet that has no record." << std::endl;
free(block) ; return NULL ;
}
PartialPacketRecord& rec = mPartialPackets[slice_packet_id] ;
rec.mem = rs_malloc(slice_length) ;
if(!rec.mem)
{
std::cerr << "(EE) Cannot allowcate memory for slice of size " << slice_length << std::endl;
return NULL ; return NULL ;
} }
mPartialPackets[slice_packet_id].mem = block ; memcpy(rec.mem, slice_data, slice_length) ; ;
mPartialPackets[slice_packet_id].size = len ; rec.size = slice_length ;
std::cerr << " => stored in new record." << std::endl;
return NULL ; // no need to check for ending return NULL ; // no need to check for ending
} }
else else
{ {
PartialPacketRecord& rec = it->second ;
if(is_packet_starting) if(is_packet_starting)
{ {
std::cerr << "(WW) dropping unfinished existing packet that gets to be replaced by new starting packet." << std::endl; std::cerr << "(WW) dropping unfinished existing packet that gets to be replaced by new starting packet." << std::endl;
free(it->second.mem) ; free(rec.mem);
it->second.mem = block ; rec.size = 0 ;
it->second.size = len ;
return NULL ;
} }
// make sure this is a continuing packet, otherwise this is an error. // make sure this is a continuing packet, otherwise this is an error.
it->second.mem = realloc(it->second.mem, it->second.size + len) ; rec.mem = realloc(rec.mem, rec.size + slice_length) ;
memcpy( &((char*)it->second.mem)[it->second.size],block,len) ; memcpy( &((char*)rec.mem)[rec.size],slice_data,slice_length) ;
it->second.size += len ; rec.size += slice_length ;
free(block) ;
std::cerr << " => added to existing record " ;
if(is_packet_ending) if(is_packet_ending)
{ {
RsItem *item = mRsSerialiser->deserialise(it->second.mem, &it->second.size); std::cerr << " => deserialising: mem=" << RsUtil::BinToHex((char*)rec.mem,std::min(20u,rec.size)) << std::endl;
RsItem *item = mRsSerialiser->deserialise(rec.mem, &rec.size);
free(it->second.mem) ; free(rec.mem) ;
mPartialPackets.erase(it) ; mPartialPackets.erase(it) ;
return item ; return item ;
} }
else else
{
std::cerr << std::endl;
return NULL ; return NULL ;
} }
}
} }
/* BandWidth Management Assistance */ /* BandWidth Management Assistance */

View File

@ -162,7 +162,7 @@ class pqistreamer: public PQInterface
time_t mStatisticsTimeStamp ; time_t mStatisticsTimeStamp ;
void locked_addTrafficClue(const RsItem *pqi, uint32_t pktsize, std::list<RSTrafficClue> &lst); void locked_addTrafficClue(const RsItem *pqi, uint32_t pktsize, std::list<RSTrafficClue> &lst);
RsItem *addPartialPacket(void *block, uint32_t len, uint32_t slice_offset, uint32_t slice_packet_id,bool packet_starting,bool packet_ending); RsItem *addPartialPacket(const void *block, uint32_t len, uint32_t slice_offset, uint32_t slice_packet_id,bool packet_starting,bool packet_ending);
std::map<uint32_t,PartialPacketRecord> mPartialPackets ; std::map<uint32_t,PartialPacketRecord> mPartialPackets ;
}; };