From cad8c89746ae3e7c37f060986f54f9ca7763f429 Mon Sep 17 00:00:00 2001 From: csoler Date: Sat, 23 Apr 2016 17:10:25 -0400 Subject: [PATCH] added packet slicing in pqistreamer --- libretroshare/src/pqi/pqiqosstreamer.cc | 3 +- libretroshare/src/pqi/pqiqosstreamer.h | 2 +- libretroshare/src/pqi/pqistreamer.cc | 166 +++++++++++++++++++----- libretroshare/src/pqi/pqistreamer.h | 10 +- 4 files changed, 144 insertions(+), 37 deletions(-) diff --git a/libretroshare/src/pqi/pqiqosstreamer.cc b/libretroshare/src/pqi/pqiqosstreamer.cc index 5470b55b8..c18388707 100644 --- a/libretroshare/src/pqi/pqiqosstreamer.cc +++ b/libretroshare/src/pqi/pqiqosstreamer.cc @@ -50,9 +50,8 @@ int pqiQoSstreamer::getQueueSize(bool in) // return pqiQoS::gatherStatistics(per_service_count,per_priority_count) ; //} -void pqiQoSstreamer::locked_storeInOutputQueue(void *ptr,int priority) +void pqiQoSstreamer::locked_storeInOutputQueue(void *ptr,int size,int priority) { - uint32_t size = getRsItemSize(ptr) ; _total_item_size += size ; ++_total_item_count ; diff --git a/libretroshare/src/pqi/pqiqosstreamer.h b/libretroshare/src/pqi/pqiqosstreamer.h index da5cba923..5ea3df65e 100644 --- a/libretroshare/src/pqi/pqiqosstreamer.h +++ b/libretroshare/src/pqi/pqiqosstreamer.h @@ -36,7 +36,7 @@ class pqiQoSstreamer: public pqithreadstreamer, public pqiQoS static const uint32_t PQI_QOS_STREAMER_MAX_LEVELS = 10 ; static const float PQI_QOS_STREAMER_ALPHA ; - virtual void locked_storeInOutputQueue(void *ptr,int priority) ; + virtual void locked_storeInOutputQueue(void *ptr, int size, int priority) ; virtual int locked_out_queue_size() const { return _total_item_count ; } virtual void locked_clear_out_queue() ; virtual int locked_compute_out_pkt_size() const { return _total_item_size ; } diff --git a/libretroshare/src/pqi/pqistreamer.cc b/libretroshare/src/pqi/pqistreamer.cc index 37bdbe065..504cf14cb 100644 --- a/libretroshare/src/pqi/pqistreamer.cc +++ b/libretroshare/src/pqi/pqistreamer.cc @@ -39,9 +39,16 @@ const int pqistreamerzone = 8221; -static const int PQISTREAM_ABS_MAX = 100000000; /* 100 MB/sec (actually per loop) */ -static const int PQISTREAM_AVG_PERIOD = 5; // update speed estimate every 5 seconds -static const float PQISTREAM_AVG_FRAC = 0.8; // for bandpass filter over speed estimate. +static const int PQISTREAM_ABS_MAX = 100000000; /* 100 MB/sec (actually per loop) */ +static const int PQISTREAM_AVG_PERIOD = 5; // update speed estimate every 5 seconds +static const float PQISTREAM_AVG_FRAC = 0.8; // for bandpass filter over speed estimate. +static const int PQISTREAM_OPTIMAL_PACKET_SIZE = 512; // It is believed that this value should be lower than TCP slices and large enough as compare to encryption padding. + // most importantly, it should be constant, so as to allow correct QoS. +static const int PQISTREAM_OPTIMAL_SLICE_OFFSET_UNIT = 16 ; // slices offset in units of 16 bits. That allows bigger numbers encoded in 4 less bits. +static const int PQISTREAM_SLICE_FLAG_ENDS = 0x01; // these flags should be kept in the range 0x01-0x08 +static const int PQISTREAM_SLICE_FLAG_STARTS = 0x02; // +static const int PQISTREAM_SLICE_PROTOCOL_VERSION_ID = 0x10; // Protocol version ID. Should hold on the 4 lower bits. +static const int PQISTREAM_PARTIAL_PACKET_HEADER_SIZE = 8; // Same size than normal header, to make the code simpler. /* This removes the print statements (which hammer pqidebug) */ /*** @@ -279,7 +286,7 @@ int pqistreamer::status() return 0; } -void pqistreamer::locked_storeInOutputQueue(void *ptr,int) +void pqistreamer::locked_storeInOutputQueue(void *ptr,int,int) { mOutPkts.push_back(ptr); } @@ -317,7 +324,7 @@ int pqistreamer::queue_outpqi_locked(RsItem *pqi,uint32_t& pktsize) if (mRsSerialiser->serialise(pqi, ptr, &pktsize)) { - locked_storeInOutputQueue(ptr,pqi->priority_level()) ; + locked_storeInOutputQueue(ptr,pktsize,pqi->priority_level()) ; if (!(mBio_flags & BIN_FLAGS_NO_DELETE)) { @@ -404,7 +411,7 @@ time_t pqistreamer::getLastIncomingTS() // // Old : 02 0014 03 00000026 [data, 26 bytes] => [version 1B] [service 2B][subpacket 1B] [size 4B] // New1: fv 0014 03 xxxxx sss [data, sss bytes] => [flags 0.5B version 0.5B] [service 2B][subpacket 1B] [packet counter 2.5B size 1.5B] -// New2: f xxxxxx ooo sssss [data, sss bytes] => [flags 0.5B] [2^24 packet count] [2^16 offset (in units of 16)] [size] +// New2: fx xxxxxx oooo ssss [data, sss bytes] => [flags 0.5B] [2^28 packet count] [2^16 offset (in units of 16)] [size 2^16] // // Flags: 0x1 => incomplete packet continued after // Flags: 0x2 => packet ending a previously incomplete packet @@ -481,29 +488,83 @@ int pqistreamer::handleoutgoing_locked() return 0; } -#define OPTIMAL_PACKET_SIZE 512 - // send a out_pkt., else send out_data. unless - // there is a pending packet. - if (!mPkt_wpending) - { - void *dta; - mPkt_wpending_size = 0 ; - int k=0; + // send a out_pkt., else send out_data. unless there is a pending packet. The strategy is to + // - grab as many packets as possible while below the optimal packet size, so as to allow some packing and decrease encryption padding overhead (suposeddly) + // - limit packets size to OPTIMAL_PACKET_SIZE when sending big packets so as to keep as much QoS as possible. - while(mPkt_wpending_size < (uint32_t)maxbytes && mPkt_wpending_size < OPTIMAL_PACKET_SIZE && (dta = locked_pop_out_data())!=NULL ) - { - uint32_t s = getRsItemSize(dta); - mPkt_wpending = realloc(mPkt_wpending,s+mPkt_wpending_size) ; - memcpy( &((char*)mPkt_wpending)[mPkt_wpending_size],dta,s) ; + if (!mPkt_wpending) + { + void *dta; + mPkt_wpending_size = 0 ; + int k=0; + + uint32_t slice_offset =0 ; + uint32_t slice_size=0; + bool slice_starts=true ; + bool slice_ends=true ; + uint32_t slice_packet_id=0 ; + + do + { + dta = locked_pop_out_data(PQISTREAM_OPTIMAL_PACKET_SIZE,PQISTREAM_OPTIMAL_SLICE_OFFSET_UNIT,slice_offset,slice_size,slice_starts,slice_ends,slice_packet_id) ; + + if(!dta) + break ; + + if(slice_size > 0xffff) + { + std::cerr << "(EE) protocol error in pqitreamer: slice size is too large and cannot be encoded." ; + free(mPkt_wpending) ; + mPkt_wpending_size = 0; + } + + if(slice_offset > 0xfffff || (slice_offset & 0xff)!=0) // 5 f, on purpose. Not a bug. + { + std::cerr << "(EE) protocol error in pqitreamer: slice size is too large and cannot be encoded." ; + free(mPkt_wpending) ; + mPkt_wpending_size = 0; + } + + if(slice_starts && slice_ends) // good old method. Send the packet as is, since it's a full packet. + { + mPkt_wpending = realloc(mPkt_wpending,slice_size+mPkt_wpending_size) ; + memcpy( &((char*)mPkt_wpending)[mPkt_wpending_size],dta,slice_size) ; free(dta); - mPkt_wpending_size += s ; + mPkt_wpending_size += slice_size ; ++k ; - } + } + else // partial packet. We make a special header for it and insert it in the stream + { + mPkt_wpending = realloc(mPkt_wpending,slice_size+mPkt_wpending_size+PQISTREAM_PARTIAL_PACKET_HEADER_SIZE) ; + memcpy( &((char*)mPkt_wpending)[mPkt_wpending_size+PQISTREAM_PARTIAL_PACKET_HEADER_SIZE],dta,slice_size) ; + free(dta); + + // New2: fp xxxxxx oooo ssss [data, sss bytes] => [flags 0.5B] [protocol version 0.5B] [2^24 packet count] [2^16 offset (in units of 16)] [size 2^16] + + uint8_t partial_flags = PQISTREAM_SLICE_PROTOCOL_VERSION_ID ; // includes version. Flags are in the first half-byte + if(slice_starts) partial_flags |= PQISTREAM_SLICE_FLAG_STARTS ; + if(slice_ends ) partial_flags |= PQISTREAM_SLICE_FLAG_ENDS ; + + ((char*)mPkt_wpending)[mPkt_wpending_size+0x00] = partial_flags ; + ((char*)mPkt_wpending)[mPkt_wpending_size+0x01] = uint8_t(slice_packet_id >> 16) & 0xff ; + ((char*)mPkt_wpending)[mPkt_wpending_size+0x02] = uint8_t(slice_packet_id >> 8) & 0xff ; + ((char*)mPkt_wpending)[mPkt_wpending_size+0x03] = uint8_t(slice_packet_id >> 0) & 0xff ; + ((char*)mPkt_wpending)[mPkt_wpending_size+0x04] = uint8_t(slice_offset >> 12) & 0xff ; + ((char*)mPkt_wpending)[mPkt_wpending_size+0x05] = uint8_t(slice_offset >> 4) & 0xff ; // not a bug. The last 4 bits are discarded because they are always 0 + ((char*)mPkt_wpending)[mPkt_wpending_size+0x06] = uint8_t(slice_size >> 8) & 0xff ; + ((char*)mPkt_wpending)[mPkt_wpending_size+0x07] = uint8_t(slice_size >> 0) & 0xff ; + + mPkt_wpending_size += slice_size + PQISTREAM_PARTIAL_PACKET_HEADER_SIZE; + ++k ; + } + } + while(mPkt_wpending_size < (uint32_t)maxbytes && mPkt_wpending_size < PQISTREAM_OPTIMAL_PACKET_SIZE ) ; + #ifdef DEBUG_PQISTREAMER - if(k > 1) - std::cerr << "Packed " << k << " packets into " << mPkt_wpending_size << " bytes." << std::endl; + if(k > 1) + std::cerr << "Packed " << k << " packets into " << mPkt_wpending_size << " bytes." << std::endl; #endif - } + } if (mPkt_wpending) { @@ -581,7 +642,7 @@ int pqistreamer::handleincoming_locked() void *block = mPkt_rpending; // initial read size: basic packet. - int blen = getRsPktBaseSize(); + int blen = getRsPktBaseSize(); // this is valid for both packet slices and normal un-sliced packets (same header size) int maxin = inAllowedBytes_locked(); @@ -657,7 +718,23 @@ start_packet_read: continue_packet: { // workout how much more to read. - int extralen = getRsItemSize(block) - blen; + + bool is_partial_packet = false ; + + int extralen =0; + int slice_offset = 0 ; + int slice_packet_id =0; + + if( ((char*)block)[0] == 0x10 || ((char*)block)[0] == 0x11 || ((char*)block)[0] == 0x12) + { + extralen = (int(((char*)block)[6]) << 8) + (int(((char*)block)[7])); + slice_offset = (int(((char*)block)[5]) << 4) + (int(((char*)block)[4]) << 12); + slice_packet_id = (int(((char*)block)[3]) << 0) + (int(((char*)block)[2]) << 8) + (int(((char*)block)[1]) << 16); + + is_partial_packet = true ; + } + else + extralen = getRsItemSize(block) - blen; #ifdef DEBUG_PQISTREAMER std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet getRsItemSize(block) = " << getRsItemSize(block) << std::endl ; @@ -822,9 +899,19 @@ continue_packet: #ifdef DEBUG_PQISTREAMER std::cerr << "[" << (void*)pthread_self() << "] " << "deserializing. Size=" << pktlen << std::endl ; #endif + RsItem *pkt ; + + if(is_partial_packet) + { + std::cerr << "Inputing partial packet " << RsUtil::BinToHex((char*)block,8) << std::endl; + + pkt = addPartialPacket(block,extralen,slice_offset,slice_packet_id) ; + } + else + pkt = mRsSerialiser->deserialise(block, &pktlen); + std::cerr << "Got packet with header " << RsUtil::BinToHex((char*)block,8) << std::endl; - RsItem *pkt = mRsSerialiser->deserialise(block, &pktlen); if ((pkt != NULL) && (0 < handleincomingitem_locked(pkt,pktlen))) { @@ -833,7 +920,7 @@ continue_packet: #endif inReadBytes_locked(pktlen); // only count deserialised packets, because that's what is actually been transfered. } - else + else if (!is_partial_packet) { #ifdef DEBUG_PQISTREAMER pqioutput(PQL_ALERT, pqistreamerzone, "Failed to handle Packet!"); @@ -864,6 +951,19 @@ continue_packet: return 0; } +RsItem *pqistreamer::addPartialPacket(const void *block,uint32_t len,uint32_t slice_offset,uint32_t slice_packet_id) +{ + std::map::iterator it = mPartialPackets.find(slice_packet_id) ; + + if(it == mPartialPackets.end()) + { + // make sure we really have starting packet. Otherwise this is an error. + } + else + { + // make sure this is a continuing packet, otherwise this is an error. + } +} /* BandWidth Management Assistance */ @@ -1094,8 +1194,14 @@ int pqistreamer::locked_gatherStatistics(std::list& out_lst,std:: return 1 ; } -void *pqistreamer::locked_pop_out_data() +void *pqistreamer::locked_pop_out_data(uint32_t max_slice_size,uint32_t offset_unit,uint32_t& offset,uint32_t& size,bool& starts,bool& ends,uint32_t& packet_id) { + offset = 0 ; + size = 0 ; + starts = true ; + ends = true ; + packet_id = 0 ; + void *res = NULL ; if (!mOutPkts.empty()) @@ -1109,3 +1215,5 @@ void *pqistreamer::locked_pop_out_data() } return res ; } + + diff --git a/libretroshare/src/pqi/pqistreamer.h b/libretroshare/src/pqi/pqistreamer.h index ff1cf0a12..47c8540df 100644 --- a/libretroshare/src/pqi/pqistreamer.h +++ b/libretroshare/src/pqi/pqistreamer.h @@ -71,13 +71,12 @@ class pqistreamer: public PQInterface // These methods are redefined in pqiQoSstreamer // - virtual void locked_storeInOutputQueue(void *ptr,int priority) ; + virtual void locked_storeInOutputQueue(void *ptr, int size, int priority) ; virtual int locked_out_queue_size() const ; virtual void locked_clear_out_queue() ; virtual int locked_compute_out_pkt_size() const ; - virtual void *locked_pop_out_data() ; - //virtual int locked_gatherStatistics(std::vector& per_service_count,std::vector& per_priority_count) const; // extracting data. - virtual int locked_gatherStatistics(std::list& outqueue_stats,std::list& inqueue_stats); // extracting data. + virtual void *locked_pop_out_data(uint32_t max_slice_size,uint32_t offset_unit,uint32_t& offset,uint32_t& size,bool& starts,bool& ends,uint32_t& packet_id); + virtual int locked_gatherStatistics(std::list& outqueue_stats,std::list& inqueue_stats); // extracting data. void updateRates() ; @@ -156,7 +155,8 @@ class pqistreamer: public PQInterface std::list mCurrentStatsChunk_Out ; time_t mStatisticsTimeStamp ; - void locked_addTrafficClue(const RsItem *pqi, uint32_t pktsize, std::list &lst); + void locked_addTrafficClue(const RsItem *pqi, uint32_t pktsize, std::list &lst); + RsItem *addPartialPacket(const void *block, uint32_t len, uint32_t slice_offset, uint32_t slice_packet_id); }; #endif //MRK_PQI_STREAMER_HEADER