diff --git a/libretroshare/src/pqi/pqiqos.cc b/libretroshare/src/pqi/pqiqos.cc index 1453f559b..a8678ca8c 100644 --- a/libretroshare/src/pqi/pqiqos.cc +++ b/libretroshare/src/pqi/pqiqos.cc @@ -6,7 +6,7 @@ #include "pqiqos.h" -static const uint32_t MAX_COUNTER_VALUE = 1024u*1024u ; // 2^20 +const uint32_t pqiQoS::MAX_PACKET_COUNTER_VALUE = (1 << 24) ; pqiQoS::pqiQoS(uint32_t nb_levels,float alpha) : _item_queues(nb_levels),_alpha(alpha) @@ -16,6 +16,7 @@ pqiQoS::pqiQoS(uint32_t nb_levels,float alpha) float c = 1.0f ; float inc = alpha ; _nb_items = 0 ; + _id_counter = 0 ; for(int i=((int)nb_levels)-1;i>=0;--i,c *= alpha) { @@ -57,7 +58,7 @@ void pqiQoS::in_rsItem(void *ptr,int size,int priority) _item_queues[priority].push(ptr,size,_id_counter++) ; ++_nb_items ; - if(_id_counter >= MAX_COUNTER_VALUE) + if(_id_counter >= MAX_PACKET_COUNTER_VALUE) _id_counter = 0 ; } diff --git a/libretroshare/src/pqi/pqiqos.h b/libretroshare/src/pqi/pqiqos.h index 438f45622..9a4bfbb15 100644 --- a/libretroshare/src/pqi/pqiqos.h +++ b/libretroshare/src/pqi/pqiqos.h @@ -45,141 +45,141 @@ class pqiQoS { +public: + pqiQoS(uint32_t max_levels,float alpha) ; + + struct ItemRecord + { + void *data ; + uint32_t current_offset ; + uint32_t size ; + uint32_t id ; + }; + + class ItemQueue + { public: - pqiQoS(uint32_t max_levels,float alpha) ; - - struct ItemRecord - { - void *data ; - uint32_t current_offset ; - uint32_t size ; - uint32_t id ; - }; - - class ItemQueue + ItemQueue() { - public: - ItemQueue() - { - _item_count =0 ; - } - void *pop() - { - if(_items.empty()) - return NULL ; + _item_count =0 ; + } + void *pop() + { + if(_items.empty()) + return NULL ; - void *item = _items.front().data ; + void *item = _items.front().data ; + _items.pop_front() ; + --_item_count ; + + return item ; + } + + void *slice(uint32_t max_size,uint32_t& offset,uint32_t& size,bool& starts,bool& ends,uint32_t& packet_id) + { + if(_items.empty()) + return NULL ; + + ItemRecord& rec(_items.front()) ; + packet_id = rec.id ; + + // readily get rid of the item if it can be sent as a whole + + if(rec.current_offset == 0 && rec.size < max_size) + { + offset = 0 ; + starts = true ; + ends = true ; + size = rec.size ; + + return pop() ; + } + starts = (rec.current_offset == 0) ; + ends = (rec.current_offset + max_size >= rec.size) ; + + if(rec.size <= rec.current_offset) + { + std::cerr << "(EE) severe error in slicing in QoS." << std::endl; + pop() ; + return NULL ; + } + + size = std::min(max_size, uint32_t((int)rec.size - (int)rec.current_offset)) ; + void *mem = rs_malloc(size) ; + + if(!mem) + { + std::cerr << "(EE) memory allocation error in QoS." << std::endl; + pop() ; + return NULL ; + } + + memcpy(mem,&((unsigned char*)rec.data)[rec.current_offset],size) ; + + if(ends) // we're taking the whole stuff. So we can delete the entry. + { + free(rec.data) ; _items.pop_front() ; - --_item_count ; - - return item ; } - - void *slice(uint32_t max_size,uint32_t& offset,uint32_t& size,bool& starts,bool& ends,uint32_t& packet_id) - { - if(_items.empty()) - return NULL ; - - ItemRecord& rec(_items.front()) ; - packet_id = rec.id ; - - // readily get rid of the item if it can be sent as a whole - - if(rec.current_offset == 0 && rec.size < max_size) - { - offset = 0 ; - starts = true ; - ends = true ; - size = rec.size ; - - return pop() ; - } - starts = (rec.current_offset == 0) ; - ends = (rec.current_offset + max_size > rec.size) ; - - if(rec.size < rec.current_offset) - { - std::cerr << "(EE) severe error in slicing in QoS." << std::endl; - pop() ; - return NULL ; - } - - size = std::min(max_size, uint32_t((int)rec.size - (int)rec.current_offset)) ; - void *mem = rs_malloc(size) ; - - if(!mem) - { - std::cerr << "(EE) memory allocation error in QoS." << std::endl; - pop() ; - return NULL ; - } - - memcpy(mem,&((unsigned char*)rec.data)[rec.current_offset],size) ; - - if(ends) // we're taking the whole stuff. So we can delete the entry. - { - free(rec.data) ; - _items.pop_front() ; - } - else - rec.current_offset += size ; - - return mem ; - } + else + rec.current_offset += size ; // by construction, !ends implies rec.current_offset < rec.size - void push(void *item,uint32_t size,uint32_t id) - { - ItemRecord rec ; - - rec.data = item ; - rec.current_offset = 0 ; - rec.size = size ; - rec.id = id ; - - _items.push_back(rec) ; - ++_item_count ; - } + return mem ; + } - uint32_t size() const { return _item_count ; } + void push(void *item,uint32_t size,uint32_t id) + { + ItemRecord rec ; - float _threshold ; - float _counter ; - float _inc ; - uint32_t _item_count ; - - std::list _items ; - }; + rec.data = item ; + rec.current_offset = 0 ; + rec.size = size ; + rec.id = id ; - // This function pops items from the queue, y order of priority - // - void *out_rsItem() ; - void *out_rsItem(uint32_t max_slice_size,uint32_t offset_unit,uint32_t& offset,uint32_t& size,bool& starts,bool& ends,uint32_t& packet_id) ; + _items.push_back(rec) ; + } - // This function is used to queue items. - // - void in_rsItem(void *item, int size, int priority) ; + uint32_t size() const { return _item_count ; } - void print() const ; - uint64_t qos_queue_size() const { return _nb_items ; } + float _threshold ; + float _counter ; + float _inc ; + uint32_t _item_count ; - // kills all waiting items. - void clear() ; + std::list _items ; + }; - // get some stats about what's going on. service_packets will contain the number of - // packets per service, and queue_sizes will contain the size of the different priority queues. + // This function pops items from the queue, y order of priority + // + void *out_rsItem() ; + void *out_rsItem(uint32_t max_slice_size,uint32_t offset_unit,uint32_t& offset,uint32_t& size,bool& starts,bool& ends,uint32_t& packet_id) ; - //int gatherStatistics(std::vector& per_service_count,std::vector& per_priority_count) const ; + // This function is used to queue items. + // + void in_rsItem(void *item, int size, int priority) ; - void computeTotalItemSize() const ; - int debug_computeTotalItemSize() const ; - private: - // This vector stores the lists of items with equal priorities. - // - std::vector _item_queues ; - float _alpha ; - uint64_t _nb_items ; - - uint32_t _id_counter ; + void print() const ; + uint64_t qos_queue_size() const { return _nb_items ; } + + // kills all waiting items. + void clear() ; + + // get some stats about what's going on. service_packets will contain the number of + // packets per service, and queue_sizes will contain the size of the different priority queues. + + //int gatherStatistics(std::vector& per_service_count,std::vector& per_priority_count) const ; + + void computeTotalItemSize() const ; + int debug_computeTotalItemSize() const ; +private: + // This vector stores the lists of items with equal priorities. + // + std::vector _item_queues ; + float _alpha ; + uint64_t _nb_items ; + uint32_t _id_counter ; + + static const uint32_t MAX_PACKET_COUNTER_VALUE ; }; diff --git a/libretroshare/src/pqi/pqistreamer.cc b/libretroshare/src/pqi/pqistreamer.cc index e1bac3245..1922cb823 100644 --- a/libretroshare/src/pqi/pqistreamer.cc +++ b/libretroshare/src/pqi/pqistreamer.cc @@ -45,8 +45,8 @@ static const float PQISTREAM_AVG_FRAC = 0.8; // for bandpass filter over static const int PQISTREAM_OPTIMAL_PACKET_SIZE = 512; // It is believed that this value should be lower than TCP slices and large enough as compare to encryption padding. // most importantly, it should be constant, so as to allow correct QoS. static const int PQISTREAM_OPTIMAL_SLICE_OFFSET_UNIT = 16 ; // slices offset in units of 16 bits. That allows bigger numbers encoded in 4 less bits. -static const int PQISTREAM_SLICE_FLAG_ENDS = 0x01; // these flags should be kept in the range 0x01-0x08 -static const int PQISTREAM_SLICE_FLAG_STARTS = 0x02; // +static const int PQISTREAM_SLICE_FLAG_STARTS = 0x01; // +static const int PQISTREAM_SLICE_FLAG_ENDS = 0x02; // these flags should be kept in the range 0x01-0x08 static const int PQISTREAM_SLICE_PROTOCOL_VERSION_ID = 0x10; // Protocol version ID. Should hold on the 4 lower bits. static const int PQISTREAM_PARTIAL_PACKET_HEADER_SIZE = 8; // Same size than normal header, to make the code simpler. @@ -527,6 +527,7 @@ int pqistreamer::handleoutgoing_locked() if(slice_starts && slice_ends) // good old method. Send the packet as is, since it's a full packet. { + std::cerr << "sending full slice, old style" << std::endl; mPkt_wpending = realloc(mPkt_wpending,slice_size+mPkt_wpending_size) ; memcpy( &((char*)mPkt_wpending)[mPkt_wpending_size],dta,slice_size) ; free(dta); @@ -535,6 +536,8 @@ int pqistreamer::handleoutgoing_locked() } else // partial packet. We make a special header for it and insert it in the stream { + std::cerr << "sending partial slice, packet ID=" << std::hex << slice_packet_id << std::dec << ", offset=" << slice_offset << ", size=" << slice_size << std::endl; + mPkt_wpending = realloc(mPkt_wpending,slice_size+mPkt_wpending_size+PQISTREAM_PARTIAL_PACKET_HEADER_SIZE) ; memcpy( &((char*)mPkt_wpending)[mPkt_wpending_size+PQISTREAM_PARTIAL_PACKET_HEADER_SIZE],dta,slice_size) ; free(dta); @@ -723,16 +726,17 @@ continue_packet: bool is_packet_starting = (((char*)block)[0] == 0x11) ; bool is_packet_ending = (((char*)block)[0] == 0x12) ; - int extralen =0; - int slice_offset = 0 ; - int slice_packet_id =0; + uint32_t extralen =0; + uint32_t slice_offset = 0 ; + uint32_t slice_packet_id =0; if( ((char*)block)[0] == 0x10 || ((char*)block)[0] == 0x11 || ((char*)block)[0] == 0x12) { - extralen = (int(((char*)block)[6]) << 8) + (int(((char*)block)[7])); - slice_offset = (int(((char*)block)[5]) << 4) + (int(((char*)block)[4]) << 12); - slice_packet_id = (int(((char*)block)[3]) << 0) + (int(((char*)block)[2]) << 8) + (int(((char*)block)[1]) << 16); + extralen = (uint32_t(((uint8_t*)block)[6]) << 8) + (uint32_t(((uint8_t*)block)[7])); + slice_offset = (uint32_t(((uint8_t*)block)[5]) << 4) + (uint32_t(((uint8_t*)block)[4]) << 12); + slice_packet_id = (uint32_t(((uint8_t*)block)[3]) << 0) + (uint32_t(((uint8_t*)block)[2]) << 8) + (uint32_t(((uint8_t*)block)[1]) << 16); + std::cerr << "Reading from mem block " << RsUtil::BinToHex((char*)block,8) << ": packet_id=" << slice_packet_id << ", len=" << extralen << ", offset=" << slice_offset << std::endl; is_partial_packet = true ; } else @@ -902,19 +906,17 @@ continue_packet: std::cerr << "[" << (void*)pthread_self() << "] " << "deserializing. Size=" << pktlen << std::endl ; #endif RsItem *pkt ; + std::cerr << "Got packet with header " << RsUtil::BinToHex((char*)block,8) << std::endl; if(is_partial_packet) { std::cerr << "Inputing partial packet " << RsUtil::BinToHex((char*)block,8) << std::endl; - pkt = addPartialPacket(block,extralen,slice_offset,slice_packet_id,is_packet_starting,is_packet_ending) ; + pkt = addPartialPacket(block,pktlen,slice_offset,slice_packet_id,is_packet_starting,is_packet_ending) ; } else pkt = mRsSerialiser->deserialise(block, &pktlen); - - - std::cerr << "Got packet with header " << RsUtil::BinToHex((char*)block,8) << std::endl; - + if ((pkt != NULL) && (0 < handleincomingitem_locked(pkt,pktlen))) { #ifdef DEBUG_PQISTREAMER @@ -953,15 +955,19 @@ continue_packet: return 0; } -RsItem *pqistreamer::addPartialPacket(void *block,uint32_t len,uint32_t slice_offset,uint32_t slice_packet_id,bool is_packet_starting,bool is_packet_ending) +RsItem *pqistreamer::addPartialPacket(const void *block,uint32_t len,uint32_t slice_offset,uint32_t slice_packet_id,bool is_packet_starting,bool is_packet_ending) { + std::cerr << "Receiving partial packet. size=" << len << ", offset=" << slice_offset << ". ID=" << std::hex << slice_packet_id << std::dec << ", starting:" << is_packet_starting << ", ending:" << is_packet_ending ; + if(is_packet_starting && is_packet_ending) { std::cerr << "(EE) unexpected situation. Got in addPartialPacket() a full packet both starting and ending" << std::endl; - free(block) ; return NULL ; } + uint32_t slice_length = len - PQISTREAM_PARTIAL_PACKET_HEADER_SIZE ; + unsigned char *slice_data = &((unsigned char*)block)[PQISTREAM_PARTIAL_PACKET_HEADER_SIZE] ; + std::map::iterator it = mPartialPackets.find(slice_packet_id) ; if(it == mPartialPackets.end()) @@ -971,44 +977,57 @@ RsItem *pqistreamer::addPartialPacket(void *block,uint32_t len,uint32_t slice_of if(!is_packet_starting) { std::cerr << "(EE) dropping non starting packet that has no record." << std::endl; - free(block) ; + return NULL ; + } + PartialPacketRecord& rec = mPartialPackets[slice_packet_id] ; + + rec.mem = rs_malloc(slice_length) ; + + if(!rec.mem) + { + std::cerr << "(EE) Cannot allowcate memory for slice of size " << slice_length << std::endl; return NULL ; } - mPartialPackets[slice_packet_id].mem = block ; - mPartialPackets[slice_packet_id].size = len ; + memcpy(rec.mem, slice_data, slice_length) ; ; + rec.size = slice_length ; + + std::cerr << " => stored in new record." << std::endl; return NULL ; // no need to check for ending } else { + PartialPacketRecord& rec = it->second ; + if(is_packet_starting) { std::cerr << "(WW) dropping unfinished existing packet that gets to be replaced by new starting packet." << std::endl; - free(it->second.mem) ; - it->second.mem = block ; - it->second.size = len ; - - return NULL ; + free(rec.mem); + rec.size = 0 ; } // make sure this is a continuing packet, otherwise this is an error. - it->second.mem = realloc(it->second.mem, it->second.size + len) ; - memcpy( &((char*)it->second.mem)[it->second.size],block,len) ; - it->second.size += len ; - free(block) ; + rec.mem = realloc(rec.mem, rec.size + slice_length) ; + memcpy( &((char*)rec.mem)[rec.size],slice_data,slice_length) ; + rec.size += slice_length ; + + std::cerr << " => added to existing record " ; if(is_packet_ending) { - RsItem *item = mRsSerialiser->deserialise(it->second.mem, &it->second.size); + std::cerr << " => deserialising: mem=" << RsUtil::BinToHex((char*)rec.mem,std::min(20u,rec.size)) << std::endl; + RsItem *item = mRsSerialiser->deserialise(rec.mem, &rec.size); - free(it->second.mem) ; + free(rec.mem) ; mPartialPackets.erase(it) ; - return item ; } else + { + std::cerr << std::endl; return NULL ; + } } } diff --git a/libretroshare/src/pqi/pqistreamer.h b/libretroshare/src/pqi/pqistreamer.h index 79c5fbefc..27badc004 100644 --- a/libretroshare/src/pqi/pqistreamer.h +++ b/libretroshare/src/pqi/pqistreamer.h @@ -162,7 +162,7 @@ class pqistreamer: public PQInterface time_t mStatisticsTimeStamp ; void locked_addTrafficClue(const RsItem *pqi, uint32_t pktsize, std::list &lst); - RsItem *addPartialPacket(void *block, uint32_t len, uint32_t slice_offset, uint32_t slice_packet_id,bool packet_starting,bool packet_ending); + RsItem *addPartialPacket(const void *block, uint32_t len, uint32_t slice_offset, uint32_t slice_packet_id,bool packet_starting,bool packet_ending); std::map mPartialPackets ; };