half-implemented packet slicing (QoS part done)

This commit is contained in:
csoler 2016-04-20 22:42:09 -04:00
parent fc725b03a8
commit e82c217cd2
5 changed files with 179 additions and 50 deletions

View File

@ -6,6 +6,8 @@
#include "pqiqos.h" #include "pqiqos.h"
static const uint32_t MAX_COUNTER_VALUE = 1024u*1024u ; // 2^20
pqiQoS::pqiQoS(uint32_t nb_levels,float alpha) pqiQoS::pqiQoS(uint32_t nb_levels,float alpha)
: _item_queues(nb_levels),_alpha(alpha) : _item_queues(nb_levels),_alpha(alpha)
{ {
@ -44,7 +46,7 @@ void pqiQoS::print() const
std::cerr << std::endl; std::cerr << std::endl;
} }
void pqiQoS::in_rsItem(void *ptr,int priority) void pqiQoS::in_rsItem(void *ptr,int size,int priority)
{ {
if(uint32_t(priority) >= _item_queues.size()) if(uint32_t(priority) >= _item_queues.size())
{ {
@ -52,8 +54,11 @@ void pqiQoS::in_rsItem(void *ptr,int priority)
priority = _item_queues.size()-1 ; priority = _item_queues.size()-1 ;
} }
_item_queues[priority].push(ptr) ; _item_queues[priority].push(ptr,size,_id_counter++) ;
++_nb_items ; ++_nb_items ;
if(_id_counter >= MAX_COUNTER_VALUE)
_id_counter = 0 ;
} }
// int pqiQoS::gatherStatistics(std::vector<uint32_t>& per_service_count,std::vector<uint32_t>& per_priority_count) const // int pqiQoS::gatherStatistics(std::vector<uint32_t>& per_service_count,std::vector<uint32_t>& per_priority_count) const
@ -81,6 +86,19 @@ void pqiQoS::in_rsItem(void *ptr,int priority)
void *pqiQoS::out_rsItem() void *pqiQoS::out_rsItem()
{
bool starts,ends ;
uint32_t packet_id,offset,size ;
void *res = out_rsItem(~0u,16,offset,size,starts,ends,packet_id) ;
if(!starts || !ends)
std::cerr << "(EE) protocol error in pqiQoS. Will eventually kill connection!" << std::endl;
return res ;
}
void *pqiQoS::out_rsItem(uint32_t max_slice_size,uint32_t offset_unit,uint32_t& offset,uint32_t& size,bool& starts,bool& ends,uint32_t& packet_id)
{ {
// Go through the queues. Increment counters. // Go through the queues. Increment counters.
@ -105,11 +123,26 @@ void *pqiQoS::out_rsItem()
if(last >= 0) if(last >= 0)
{ {
assert(_nb_items > 0) ; assert(_nb_items > 0) ;
--_nb_items ;
return _item_queues[last].pop(); // now chop a slice of this item
void *res = _item_queues[last].slice(max_slice_size,offset,size,starts,ends,packet_id) ;
if(ends)
--_nb_items ;
if( (offset % offset_unit) != 0)
std::cerr << "(EE) Severe error in pqiQoS::out_rsItem(). offset unit inconsistent with calculated offset." << std::endl;
offset /= offset_unit ;
return res ;
} }
else else
return NULL ; return NULL ;
} }

View File

@ -36,55 +36,128 @@
#include <stdint.h> #include <stdint.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h>
#include <iostream>
#include <vector> #include <vector>
#include <list> #include <list>
#include <util/rsmemory.h>
class pqiQoS class pqiQoS
{ {
public: public:
pqiQoS(uint32_t max_levels,float alpha) ; pqiQoS(uint32_t max_levels,float alpha) ;
struct ItemRecord
{
void *data ;
uint32_t current_offset ;
uint32_t size ;
uint32_t id ;
};
class ItemQueue class ItemQueue
{ {
public: public:
ItemQueue() ItemQueue()
{ {
_item_count =0 ; _item_count =0 ;
} }
void *pop() void *pop()
{
if(_items.empty())
return NULL ;
void *item = _items.front().data ;
_items.pop_front() ;
--_item_count ;
return item ;
}
void *slice(uint32_t max_size,uint32_t& offset,uint32_t& size,bool& starts,bool& ends,uint32_t& packet_id)
{
if(_items.empty())
return NULL ;
ItemRecord& rec(_items.front()) ;
packet_id = rec.id ;
// readily get rid of the item if it can be sent as a whole
if(rec.current_offset == 0 && rec.size < max_size)
{
offset = 0 ;
starts = true ;
ends = true ;
size = rec.size ;
return pop() ;
}
starts = (rec.current_offset == 0) ;
ends = (rec.current_offset + max_size > rec.size) ;
if(rec.size < rec.current_offset)
{ {
if(_items.empty()) std::cerr << "(EE) severe error in slicing in QoS." << std::endl;
return NULL ; pop() ;
return NULL ;
void *item = _items.front() ;
_items.pop_front() ;
--_item_count ;
return item ;
} }
void push(void *item) size = std::min(max_size, uint32_t((int)rec.size - (int)rec.current_offset)) ;
void *mem = rs_malloc(size) ;
if(!mem)
{ {
_items.push_back(item) ; std::cerr << "(EE) memory allocation error in QoS." << std::endl;
++_item_count ; pop() ;
} return NULL ;
}
uint32_t size() const { return _item_count ; } memcpy(mem,&((unsigned char*)rec.data)[rec.current_offset],size) ;
float _threshold ; if(ends) // we're taking the whole stuff. So we can delete the entry.
float _counter ; {
float _inc ; free(rec.data) ;
uint32_t _item_count ; _items.pop_front() ;
std::list<void*> _items ; }
else
rec.current_offset += size ;
return mem ;
}
void push(void *item,uint32_t size,uint32_t id)
{
ItemRecord rec ;
rec.data = item ;
rec.current_offset = 0 ;
rec.size = size ;
rec.id = id ;
_items.push_back(rec) ;
++_item_count ;
}
uint32_t size() const { return _item_count ; }
float _threshold ;
float _counter ;
float _inc ;
uint32_t _item_count ;
std::list<ItemRecord> _items ;
}; };
// This function pops items from the queue, y order of priority // This function pops items from the queue, y order of priority
// //
void *out_rsItem() ; void *out_rsItem() ;
void *out_rsItem(uint32_t max_slice_size,uint32_t offset_unit,uint32_t& offset,uint32_t& size,bool& starts,bool& ends,uint32_t& packet_id) ;
// This function is used to queue items. // This function is used to queue items.
// //
void in_rsItem(void *item,int priority) ; void in_rsItem(void *item, int size, int priority) ;
void print() const ; void print() const ;
uint64_t qos_queue_size() const { return _nb_items ; } uint64_t qos_queue_size() const { return _nb_items ; }
@ -105,6 +178,8 @@ class pqiQoS
std::vector<ItemQueue> _item_queues ; std::vector<ItemQueue> _item_queues ;
float _alpha ; float _alpha ;
uint64_t _nb_items ; uint64_t _nb_items ;
uint32_t _id_counter ;
}; };

View File

@ -52,10 +52,11 @@ int pqiQoSstreamer::getQueueSize(bool in)
void pqiQoSstreamer::locked_storeInOutputQueue(void *ptr,int priority) void pqiQoSstreamer::locked_storeInOutputQueue(void *ptr,int priority)
{ {
_total_item_size += getRsItemSize(ptr) ; uint32_t size = getRsItemSize(ptr) ;
_total_item_size += size ;
++_total_item_count ; ++_total_item_count ;
pqiQoS::in_rsItem(ptr,priority) ; pqiQoS::in_rsItem(ptr,size,priority) ;
} }
void pqiQoSstreamer::locked_clear_out_queue() void pqiQoSstreamer::locked_clear_out_queue()
@ -65,9 +66,9 @@ void pqiQoSstreamer::locked_clear_out_queue()
_total_item_count = 0 ; _total_item_count = 0 ;
} }
void *pqiQoSstreamer::locked_pop_out_data() void *pqiQoSstreamer::locked_pop_out_data(uint32_t max_slice_size,uint32_t offset_unit,uint32_t& offset,uint32_t& size,bool& starts,bool& ends,uint32_t& packet_id)
{ {
void *out = pqiQoS::out_rsItem() ; void *out = pqiQoS::out_rsItem(max_slice_size,offset_unit,offset,size,starts,ends,packet_id) ;
if(out != NULL) if(out != NULL)
{ {

View File

@ -40,7 +40,7 @@ class pqiQoSstreamer: public pqithreadstreamer, public pqiQoS
virtual int locked_out_queue_size() const { return _total_item_count ; } virtual int locked_out_queue_size() const { return _total_item_count ; }
virtual void locked_clear_out_queue() ; virtual void locked_clear_out_queue() ;
virtual int locked_compute_out_pkt_size() const { return _total_item_size ; } virtual int locked_compute_out_pkt_size() const { return _total_item_size ; }
virtual void *locked_pop_out_data() ; virtual void *locked_pop_out_data(uint32_t max_slice_size,uint32_t offset_unit,uint32_t& offset,uint32_t& size,bool& starts,bool& ends,uint32_t& packet_id);
//virtual int locked_gatherStatistics(std::vector<uint32_t>& per_service_count,std::vector<uint32_t>& per_priority_count) const; // extracting data. //virtual int locked_gatherStatistics(std::vector<uint32_t>& per_service_count,std::vector<uint32_t>& per_priority_count) const; // extracting data.

View File

@ -400,6 +400,37 @@ time_t pqistreamer::getLastIncomingTS()
return mLastIncomingTs; return mLastIncomingTs;
} }
// Packet slicing:
//
// Old : 02 0014 03 00000026 [data, 26 bytes] => [version 1B] [service 2B][subpacket 1B] [size 4B]
// New1: fv 0014 03 xxxxx sss [data, sss bytes] => [flags 0.5B version 0.5B] [service 2B][subpacket 1B] [packet counter 2.5B size 1.5B]
// New2: f xxxxxx ooo sssss [data, sss bytes] => [flags 0.5B] [2^24 packet count] [2^16 offset (in units of 16)] [size]
//
// Flags: 0x1 => incomplete packet continued after
// Flags: 0x2 => packet ending a previously incomplete packet
//
// - backward compatibility:
// * send one packet with service + subpacket = ffffff. Old peers will silently ignore such packets.
// * if received, mark the peer as able to decode the new packet type
//
// Mode 1:
// - Encode length on 1.5 Bytes (10 bits) => max slice size = 1024
// - Encode packet ID on 2.5 Bytes (20 bits) => packet counter = [0...1056364]
// Mode 2:
// - Encode flags on 0.5 Bytes ( 4 bits)
// - Encode packet ID on 3.5 Bytes (28 bits) => packet counter = [0...16777216]
// - Encode offset on 2.0 Bytes (16 bits) => 65536 * 16 = // ax packet size = 1048576
// - Encode size on 2.0 Bytes (16 bits) => 65536 // max slice size = 65536
//
// - limit packet grouping to max size 1024.
// - new peers need to read flux, and properly extract partial sizes, and combine packets based on packet counter.
// - on sending, RS should grab slices of max size 1024 from pqiQoS. If smaller, possibly pack them together.
// pqiQoS keeps track of sliced packets and makes sure the output is consistent:
// * when a large packet needs to be send, only takes a slice and return it, and update the remaining part
// * always consider priority when taking new slices => a newly arrived fast packet will always get through.
//
// Max slice size should be customisable, depending on bandwidth.
int pqistreamer::handleoutgoing_locked() int pqistreamer::handleoutgoing_locked()
{ {
#ifdef DEBUG_PQISTREAMER #ifdef DEBUG_PQISTREAMER
@ -450,18 +481,16 @@ int pqistreamer::handleoutgoing_locked()
return 0; return 0;
} }
#define GROUP_OUTGOING_PACKETS 1 #define OPTIMAL_PACKET_SIZE 512
#define PACKET_GROUPING_SIZE_LIMIT 32768
// send a out_pkt., else send out_data. unless // send a out_pkt., else send out_data. unless
// there is a pending packet. // there is a pending packet.
if (!mPkt_wpending) if (!mPkt_wpending)
#ifdef GROUP_OUTGOING_PACKETS
{ {
void *dta; void *dta;
mPkt_wpending_size = 0 ; mPkt_wpending_size = 0 ;
int k=0; int k=0;
while(mPkt_wpending_size < (uint32_t)maxbytes && mPkt_wpending_size < PACKET_GROUPING_SIZE_LIMIT && (dta = locked_pop_out_data())!=NULL ) while(mPkt_wpending_size < (uint32_t)maxbytes && mPkt_wpending_size < OPTIMAL_PACKET_SIZE && (dta = locked_pop_out_data())!=NULL )
{ {
uint32_t s = getRsItemSize(dta); uint32_t s = getRsItemSize(dta);
mPkt_wpending = realloc(mPkt_wpending,s+mPkt_wpending_size) ; mPkt_wpending = realloc(mPkt_wpending,s+mPkt_wpending_size) ;
@ -475,17 +504,7 @@ int pqistreamer::handleoutgoing_locked()
std::cerr << "Packed " << k << " packets into " << mPkt_wpending_size << " bytes." << std::endl; std::cerr << "Packed " << k << " packets into " << mPkt_wpending_size << " bytes." << std::endl;
#endif #endif
} }
#else
{
void *dta = locked_pop_out_data() ;
if(dta != NULL)
{
mPkt_wpending = dta ;
mPkt_wpending_size = getRsItemSize(dta);
}
}
#endif
if (mPkt_wpending) if (mPkt_wpending)
{ {
RsScopeTimer tmer("pqistreamer:"+PeerId().toStdString()) ; RsScopeTimer tmer("pqistreamer:"+PeerId().toStdString()) ;
@ -804,6 +823,7 @@ continue_packet:
std::cerr << "[" << (void*)pthread_self() << "] " << "deserializing. Size=" << pktlen << std::endl ; std::cerr << "[" << (void*)pthread_self() << "] " << "deserializing. Size=" << pktlen << std::endl ;
#endif #endif
std::cerr << "Got packet with header " << RsUtil::BinToHex((char*)block,8) << std::endl;
RsItem *pkt = mRsSerialiser->deserialise(block, &pktlen); RsItem *pkt = mRsSerialiser->deserialise(block, &pktlen);
if ((pkt != NULL) && (0 < handleincomingitem_locked(pkt,pktlen))) if ((pkt != NULL) && (0 < handleincomingitem_locked(pkt,pktlen)))