diff --git a/libretroshare/src/pqi/pqiqos.cc b/libretroshare/src/pqi/pqiqos.cc index b0d5ec5f5..b2472c0b7 100644 --- a/libretroshare/src/pqi/pqiqos.cc +++ b/libretroshare/src/pqi/pqiqos.cc @@ -6,6 +6,8 @@ #include "pqiqos.h" +const uint32_t pqiQoS::MAX_PACKET_COUNTER_VALUE = (1 << 24) ; + pqiQoS::pqiQoS(uint32_t nb_levels,float alpha) : _item_queues(nb_levels),_alpha(alpha) { @@ -14,6 +16,7 @@ pqiQoS::pqiQoS(uint32_t nb_levels,float alpha) float c = 1.0f ; float inc = alpha ; _nb_items = 0 ; + _id_counter = 0 ; for(int i=((int)nb_levels)-1;i>=0;--i,c *= alpha) { @@ -44,7 +47,7 @@ void pqiQoS::print() const std::cerr << std::endl; } -void pqiQoS::in_rsItem(void *ptr,int priority) +void pqiQoS::in_rsItem(void *ptr,int size,int priority) { if(uint32_t(priority) >= _item_queues.size()) { @@ -52,8 +55,11 @@ void pqiQoS::in_rsItem(void *ptr,int priority) priority = _item_queues.size()-1 ; } - _item_queues[priority].push(ptr) ; + _item_queues[priority].push(ptr,size,_id_counter++) ; ++_nb_items ; + + if(_id_counter >= MAX_PACKET_COUNTER_VALUE) + _id_counter = 0 ; } // int pqiQoS::gatherStatistics(std::vector& per_service_count,std::vector& per_priority_count) const @@ -80,7 +86,7 @@ void pqiQoS::in_rsItem(void *ptr,int priority) // } -void *pqiQoS::out_rsItem() +void *pqiQoS::out_rsItem(uint32_t max_slice_size, uint32_t& size, bool& starts, bool& ends, uint32_t& packet_id) { // Go through the queues. Increment counters. @@ -105,11 +111,21 @@ void *pqiQoS::out_rsItem() if(last >= 0) { assert(_nb_items > 0) ; - --_nb_items ; - return _item_queues[last].pop(); + + // now chop a slice of this item + + void *res = _item_queues[last].slice(max_slice_size,size,starts,ends,packet_id) ; + + if(ends) + --_nb_items ; + + return res ; } else return NULL ; } + + + diff --git a/libretroshare/src/pqi/pqiqos.h b/libretroshare/src/pqi/pqiqos.h index 0b7939f80..a83ec1445 100644 --- a/libretroshare/src/pqi/pqiqos.h +++ b/libretroshare/src/pqi/pqiqos.h @@ -36,75 +36,148 @@ #include #include +#include +#include #include #include +#include + class pqiQoS { +public: + pqiQoS(uint32_t max_levels,float alpha) ; + + struct ItemRecord + { + void *data ; + uint32_t current_offset ; + uint32_t size ; + uint32_t id ; + }; + + class ItemQueue + { public: - pqiQoS(uint32_t max_levels,float alpha) ; - - class ItemQueue + ItemQueue() { - public: - ItemQueue() - { - _item_count =0 ; - } - void *pop() - { - if(_items.empty()) - return NULL ; + _item_count =0 ; + } + void *pop() + { + if(_items.empty()) + return NULL ; - void *item = _items.front() ; - _items.pop_front() ; - --_item_count ; + void *item = _items.front().data ; + _items.pop_front() ; + --_item_count ; - return item ; - } + return item ; + } - void push(void *item) - { - _items.push_back(item) ; - ++_item_count ; - } + void *slice(uint32_t max_size,uint32_t& size,bool& starts,bool& ends,uint32_t& packet_id) + { + if(_items.empty()) + return NULL ; - uint32_t size() const { return _item_count ; } + ItemRecord& rec(_items.front()) ; + packet_id = rec.id ; - float _threshold ; - float _counter ; - float _inc ; - uint32_t _item_count ; - std::list _items ; - }; + // readily get rid of the item if it can be sent as a whole - // This function pops items from the queue, y order of priority - // - void *out_rsItem() ; + if(rec.current_offset == 0 && rec.size < max_size) + { + starts = true ; + ends = true ; + size = rec.size ; - // This function is used to queue items. - // - void in_rsItem(void *item,int priority) ; + return pop() ; + } + starts = (rec.current_offset == 0) ; + ends = (rec.current_offset + max_size >= rec.size) ; - void print() const ; - uint64_t qos_queue_size() const { return _nb_items ; } + if(rec.size <= rec.current_offset) + { + std::cerr << "(EE) severe error in slicing in QoS." << std::endl; + pop() ; + return NULL ; + } - // kills all waiting items. - void clear() ; + size = std::min(max_size, uint32_t((int)rec.size - (int)rec.current_offset)) ; + void *mem = rs_malloc(size) ; - // get some stats about what's going on. service_packets will contain the number of - // packets per service, and queue_sizes will contain the size of the different priority queues. + if(!mem) + { + std::cerr << "(EE) memory allocation error in QoS." << std::endl; + pop() ; + return NULL ; + } - //int gatherStatistics(std::vector& per_service_count,std::vector& per_priority_count) const ; + memcpy(mem,&((unsigned char*)rec.data)[rec.current_offset],size) ; - void computeTotalItemSize() const ; - int debug_computeTotalItemSize() const ; - private: - // This vector stores the lists of items with equal priorities. - // - std::vector _item_queues ; - float _alpha ; - uint64_t _nb_items ; + if(ends) // we're taking the whole stuff. So we can delete the entry. + { + free(rec.data) ; + _items.pop_front() ; + } + else + rec.current_offset += size ; // by construction, !ends implies rec.current_offset < rec.size + + return mem ; + } + + void push(void *item,uint32_t size,uint32_t id) + { + ItemRecord rec ; + + rec.data = item ; + rec.current_offset = 0 ; + rec.size = size ; + rec.id = id ; + + _items.push_back(rec) ; + } + + uint32_t size() const { return _item_count ; } + + float _threshold ; + float _counter ; + float _inc ; + uint32_t _item_count ; + + std::list _items ; + }; + + // This function pops items from the queue, y order of priority + // + void *out_rsItem(uint32_t max_slice_size,uint32_t& size,bool& starts,bool& ends,uint32_t& packet_id) ; + + // This function is used to queue items. + // + void in_rsItem(void *item, int size, int priority) ; + + void print() const ; + uint64_t qos_queue_size() const { return _nb_items ; } + + // kills all waiting items. + void clear() ; + + // get some stats about what's going on. service_packets will contain the number of + // packets per service, and queue_sizes will contain the size of the different priority queues. + + //int gatherStatistics(std::vector& per_service_count,std::vector& per_priority_count) const ; + + void computeTotalItemSize() const ; + int debug_computeTotalItemSize() const ; +private: + // This vector stores the lists of items with equal priorities. + // + std::vector _item_queues ; + float _alpha ; + uint64_t _nb_items ; + uint32_t _id_counter ; + + static const uint32_t MAX_PACKET_COUNTER_VALUE ; }; diff --git a/libretroshare/src/pqi/pqiqosstreamer.cc b/libretroshare/src/pqi/pqiqosstreamer.cc index fc0dc2084..c9cc7d570 100644 --- a/libretroshare/src/pqi/pqiqosstreamer.cc +++ b/libretroshare/src/pqi/pqiqosstreamer.cc @@ -50,12 +50,12 @@ int pqiQoSstreamer::getQueueSize(bool in) // return pqiQoS::gatherStatistics(per_service_count,per_priority_count) ; //} -void pqiQoSstreamer::locked_storeInOutputQueue(void *ptr,int priority) +void pqiQoSstreamer::locked_storeInOutputQueue(void *ptr,int size,int priority) { - _total_item_size += getRsItemSize(ptr) ; + _total_item_size += size ; ++_total_item_count ; - pqiQoS::in_rsItem(ptr,priority) ; + pqiQoS::in_rsItem(ptr,size,priority) ; } void pqiQoSstreamer::locked_clear_out_queue() @@ -65,14 +65,16 @@ void pqiQoSstreamer::locked_clear_out_queue() _total_item_count = 0 ; } -void *pqiQoSstreamer::locked_pop_out_data() +void *pqiQoSstreamer::locked_pop_out_data(uint32_t max_slice_size, uint32_t& size, bool& starts, bool& ends, uint32_t& packet_id) { - void *out = pqiQoS::out_rsItem() ; + void *out = pqiQoS::out_rsItem(max_slice_size,size,starts,ends,packet_id) ; if(out != NULL) { _total_item_size -= getRsItemSize(out) ; - --_total_item_count ; + + if(ends) + --_total_item_count ; } return out ; diff --git a/libretroshare/src/pqi/pqiqosstreamer.h b/libretroshare/src/pqi/pqiqosstreamer.h index 574ba94d6..0144cbdb8 100644 --- a/libretroshare/src/pqi/pqiqosstreamer.h +++ b/libretroshare/src/pqi/pqiqosstreamer.h @@ -36,11 +36,11 @@ class pqiQoSstreamer: public pqithreadstreamer, public pqiQoS static const uint32_t PQI_QOS_STREAMER_MAX_LEVELS = 10 ; static const float PQI_QOS_STREAMER_ALPHA ; - virtual void locked_storeInOutputQueue(void *ptr,int priority) ; + virtual void locked_storeInOutputQueue(void *ptr, int size, int priority) ; virtual int locked_out_queue_size() const { return _total_item_count ; } virtual void locked_clear_out_queue() ; virtual int locked_compute_out_pkt_size() const { return _total_item_size ; } - virtual void *locked_pop_out_data() ; + virtual void *locked_pop_out_data(uint32_t max_slice_size,uint32_t& size,bool& starts,bool& ends,uint32_t& packet_id); //virtual int locked_gatherStatistics(std::vector& per_service_count,std::vector& per_priority_count) const; // extracting data. diff --git a/libretroshare/src/pqi/pqissl.cc b/libretroshare/src/pqi/pqissl.cc index 15c088627..e729534bc 100644 --- a/libretroshare/src/pqi/pqissl.cc +++ b/libretroshare/src/pqi/pqissl.cc @@ -1857,7 +1857,11 @@ bool pqissl::moretoread(uint32_t usec) #endif return 1; } - else + else if(SSL_pending(ssl_connection) > 0) + { + return 1 ; + } + else { #ifdef PQISSL_DEBUG rslog(RSL_DEBUG_ALL, pqisslzone, diff --git a/libretroshare/src/pqi/pqistreamer.cc b/libretroshare/src/pqi/pqistreamer.cc index 094d9d42a..8bf2fafb5 100644 --- a/libretroshare/src/pqi/pqistreamer.cc +++ b/libretroshare/src/pqi/pqistreamer.cc @@ -30,6 +30,7 @@ #include "util/rsdebug.h" #include "util/rsstring.h" #include "util/rsprint.h" +#include "util/rsscopetimer.h" #include "pqi/pqistreamer.h" #include "rsserver/p3face.h" @@ -38,15 +39,29 @@ const int pqistreamerzone = 8221; -static const int PQISTREAM_ABS_MAX = 100000000; /* 100 MB/sec (actually per loop) */ -static const int PQISTREAM_AVG_PERIOD = 5; // update speed estimate every 5 seconds -static const float PQISTREAM_AVG_FRAC = 0.8; // for bandpass filter over speed estimate. +static const int PQISTREAM_ABS_MAX = 100000000; /* 100 MB/sec (actually per loop) */ +static const int PQISTREAM_AVG_PERIOD = 5; // update speed estimate every 5 seconds +static const float PQISTREAM_AVG_FRAC = 0.8; // for bandpass filter over speed estimate. +static const int PQISTREAM_OPTIMAL_PACKET_SIZE = 512; // It is believed that this value should be lower than TCP slices and large enough as compare to encryption padding. + // most importantly, it should be constant, so as to allow correct QoS. +static const int PQISTREAM_SLICE_FLAG_STARTS = 0x01; // +static const int PQISTREAM_SLICE_FLAG_ENDS = 0x02; // these flags should be kept in the range 0x01-0x08 +static const int PQISTREAM_SLICE_PROTOCOL_VERSION_ID_01 = 0x10; // Protocol version ID. Should hold on the 4 lower bits. +static const int PQISTREAM_PARTIAL_PACKET_HEADER_SIZE = 8; // Same size than normal header, to make the code simpler. +static const int PQISTREAM_PACKET_SLICING_PROBE_DELAY = 60; // send every 60 secs. + +// This is a probe packet, that won't deserialise (it's empty) but will not cause problems to old peers either, since they will ignore +// it. This packet however will be understood by new peers as a signal to enable packet slicing. This should go when all peers use the +// same protocol. + +static uint8_t PACKET_SLICING_PROBE_BYTES[8] = { 0x02, 0xaa, 0xbb, 0xcc, 0x00, 0x00, 0x00, 0x08 } ; /* This removes the print statements (which hammer pqidebug) */ /*** #define RSITEM_DEBUG 1 #define DEBUG_TRANSFERS 1 #define DEBUG_PQISTREAMER 1 +#define DEBUG_PACKET_SLICING 1 ***/ #ifdef DEBUG_TRANSFERS @@ -64,6 +79,9 @@ pqistreamer::pqistreamer(RsSerialiser *rss, const RsPeerId& id, BinInterface *bi { RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/ + mAcceptsPacketSlicing = false ; // by default. Will be turned into true when everyone's ready. + mLastSentPacketSlicingProbe = 0 ; + mAvgLastUpdate = mCurrReadTS = mCurrSentTS = time(NULL); mIncomingSize = 0 ; @@ -95,7 +113,9 @@ pqistreamer::pqistreamer(RsSerialiser *rss, const RsPeerId& id, BinInterface *bi pqistreamer::~pqistreamer() { RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/ - +#ifdef DEBUG_PQISTREAMER + std::cerr << "Closing pqistreamer." << std::endl; +#endif pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::~pqistreamer() Destruction!"); if (mBio_flags & BIN_FLAGS_NO_CLOSE) @@ -185,8 +205,8 @@ void pqistreamer::updateRates() { int64_t diff = int64_t(t) - int64_t(mAvgLastUpdate) ; - float avgReadpSec = getRate(true) * PQISTREAM_AVG_FRAC + (1.0 - PQISTREAM_AVG_FRAC) * mAvgReadCount/(1000.0 * float(diff)); - float avgSentpSec = getRate(false) * PQISTREAM_AVG_FRAC + (1.0 - PQISTREAM_AVG_FRAC) * mAvgSentCount/(1000.0 * float(diff)); + float avgReadpSec = getRate(true ) * PQISTREAM_AVG_FRAC + (1.0 - PQISTREAM_AVG_FRAC) * mAvgReadCount/(1000.0 * float(diff)); + float avgSentpSec = getRate(false) * PQISTREAM_AVG_FRAC + (1.0 - PQISTREAM_AVG_FRAC) * mAvgSentCount/(1000.0 * float(diff)); #ifdef DEBUG_PQISTREAMER std::cerr << "Peer " << PeerId() << ": Current speed estimates: " << avgReadpSec << " / " << avgSentpSec << std::endl; @@ -256,6 +276,7 @@ int pqistreamer::tick_send(uint32_t timeout) { handleoutgoing_locked(); } + return 1; } @@ -278,7 +299,7 @@ int pqistreamer::status() return 0; } -void pqistreamer::locked_storeInOutputQueue(void *ptr,int) +void pqistreamer::locked_storeInOutputQueue(void *ptr,int,int) { mOutPkts.push_back(ptr); } @@ -316,7 +337,7 @@ int pqistreamer::queue_outpqi_locked(RsItem *pqi,uint32_t& pktsize) if (mRsSerialiser->serialise(pqi, ptr, &pktsize)) { - locked_storeInOutputQueue(ptr,pqi->priority_level()) ; + locked_storeInOutputQueue(ptr,pktsize,pqi->priority_level()) ; if (!(mBio_flags & BIN_FLAGS_NO_DELETE)) { @@ -399,6 +420,37 @@ time_t pqistreamer::getLastIncomingTS() return mLastIncomingTs; } +// Packet slicing: +// +// Old : 02 0014 03 00000026 [data, 26 bytes] => [version 1B] [service 2B][subpacket 1B] [size 4B] +// New1: fv 0014 03 xxxxx sss [data, sss bytes] => [flags 0.5B version 0.5B] [service 2B][subpacket 1B] [packet counter 2.5B size 1.5B] +// New2: pp ff xxxxxxxx ssss [data, sss bytes] => [flags 1B] [protocol version 1B] [2^32 packet count] [2^16 size] +// +// Flags: 0x1 => incomplete packet continued after +// Flags: 0x2 => packet ending a previously incomplete packet +// +// - backward compatibility: +// * send one packet with service + subpacket = ffffff. Old peers will silently ignore such packets. +// * if received, mark the peer as able to decode the new packet type +// +// Mode 1: +// - Encode length on 1.5 Bytes (10 bits) => max slice size = 1024 +// - Encode packet ID on 2.5 Bytes (20 bits) => packet counter = [0...1056364] +// Mode 2: +// - Encode protocol on 1.0 Bytes ( 8 bits) +// - Encode flags on 1.0 Bytes ( 8 bits) +// - Encode packet ID on 4.0 Bytes (32 bits) => packet counter = [0...2^32] +// - Encode size on 2.0 Bytes (16 bits) => 65536 // max slice size = 65536 +// +// - limit packet grouping to max size 1024. +// - new peers need to read flux, and properly extract partial sizes, and combine packets based on packet counter. +// - on sending, RS should grab slices of max size 1024 from pqiQoS. If smaller, possibly pack them together. +// pqiQoS keeps track of sliced packets and makes sure the output is consistent: +// * when a large packet needs to be send, only takes a slice and return it, and update the remaining part +// * always consider priority when taking new slices => a newly arrived fast packet will always get through. +// +// Max slice size should be customisable, depending on bandwidth. + int pqistreamer::handleoutgoing_locked() { #ifdef DEBUG_PQISTREAMER @@ -417,6 +469,10 @@ int pqistreamer::handleoutgoing_locked() { /* if we are not active - clear anything in the queues. */ locked_clear_out_queue() ; +#ifdef DEBUG_PACKET_SLICING + std::cerr << "(II) Switching off packet slicing." << std::endl; +#endif + mAcceptsPacketSlicing = false ; /* also remove the pending packets */ if (mPkt_wpending) @@ -440,62 +496,116 @@ int pqistreamer::handleoutgoing_locked() if ((!(mBio->cansend(0))) || (maxbytes < sentbytes)) { -#ifdef DEBUG_TRANSFERS +#ifdef DEBUG_PACKET_SLICING if (maxbytes < sentbytes) - { - std::cerr << "pqistreamer::handleoutgoing_locked() Stopped sending sentbytes > maxbytes. Sent " << sentbytes << " bytes "; - std::cerr << std::endl; - } + std::cerr << "pqistreamer::handleoutgoing_locked() Stopped sending: bio not ready. maxbytes=" << maxbytes << ", sentbytes=" << sentbytes << std::endl; else - { - std::cerr << "pqistreamer::handleoutgoing_locked() Stopped sending at cansend() is false"; - std::cerr << std::endl; - } + std::cerr << "pqistreamer::handleoutgoing_locked() Stopped sending: sentbytes=" << sentbytes << ", max=" << maxbytes << std::endl; #endif return 0; } -#define GROUP_OUTGOING_PACKETS 1 -#define PACKET_GROUPING_SIZE_LIMIT 512 - // send a out_pkt., else send out_data. unless - // there is a pending packet. + // send a out_pkt., else send out_data. unless there is a pending packet. The strategy is to + // - grab as many packets as possible while below the optimal packet size, so as to allow some packing and decrease encryption padding overhead (suposeddly) + // - limit packets size to OPTIMAL_PACKET_SIZE when sending big packets so as to keep as much QoS as possible. + if (!mPkt_wpending) -#ifdef GROUP_OUTGOING_PACKETS - { - void *dta; - mPkt_wpending_size = 0 ; - int k=0; - - while(mPkt_wpending_size < (uint32_t)maxbytes && mPkt_wpending_size < PACKET_GROUPING_SIZE_LIMIT && (dta = locked_pop_out_data())!=NULL ) - { - uint32_t s = getRsItemSize(dta); - mPkt_wpending = realloc(mPkt_wpending,s+mPkt_wpending_size) ; - memcpy( &((char*)mPkt_wpending)[mPkt_wpending_size],dta,s) ; - free(dta); - mPkt_wpending_size += s ; - ++k ; - } -#ifdef DEBUG_PQISTREAMER - if(k > 1) - std::cerr << "Packed " << k << " packets into " << mPkt_wpending_size << " bytes." << std::endl; -#endif - } -#else { - void *dta = locked_pop_out_data() ; + void *dta; + mPkt_wpending_size = 0 ; + int k=0; - if(dta != NULL) - { - mPkt_wpending = dta ; - mPkt_wpending_size = getRsItemSize(dta); - } - } + // Checks for inserting a packet slicing probe. We do that to send the other peer the information that packet slicing can be used. + // if so, we enable it for the session. This should be removed (because it's unnecessary) when all users have switched to the new version. + time_t now = time(NULL) ; + + if((!mAcceptsPacketSlicing) && now > mLastSentPacketSlicingProbe + PQISTREAM_PACKET_SLICING_PROBE_DELAY) + { +#ifdef DEBUG_PACKET_SLICING + std::cerr << "(II) Inserting packet slicing probe in traffic" << std::endl; #endif + + mPkt_wpending_size = 8 ; + mPkt_wpending = rs_malloc(8) ; + memcpy(mPkt_wpending,PACKET_SLICING_PROBE_BYTES,8) ; + + mLastSentPacketSlicingProbe = now ; + } + + uint32_t slice_size=0; + bool slice_starts=true ; + bool slice_ends=true ; + uint32_t slice_packet_id=0 ; + + do + { + int desired_packet_size = mAcceptsPacketSlicing?PQISTREAM_OPTIMAL_PACKET_SIZE:(getRsPktMaxSize()); + + dta = locked_pop_out_data(desired_packet_size,slice_size,slice_starts,slice_ends,slice_packet_id) ; + + if(!dta) + break ; + + if(slice_size > 0xffff) + { + std::cerr << "(EE) protocol error in pqitreamer: slice size is too large and cannot be encoded." ; + free(mPkt_wpending) ; + mPkt_wpending_size = 0; + } + + if(slice_starts && slice_ends) // good old method. Send the packet as is, since it's a full packet. + { +#ifdef DEBUG_PACKET_SLICING + std::cerr << "sending full slice, old style. Size=" << slice_size << std::endl; +#endif + mPkt_wpending = realloc(mPkt_wpending,slice_size+mPkt_wpending_size) ; + memcpy( &((char*)mPkt_wpending)[mPkt_wpending_size],dta,slice_size) ; + free(dta); + mPkt_wpending_size += slice_size ; + ++k ; + } + else // partial packet. We make a special header for it and insert it in the stream + { +#ifdef DEBUG_PACKET_SLICING + std::cerr << "sending partial slice, packet ID=" << std::hex << slice_packet_id << std::dec << ", size=" << slice_size << std::endl; +#endif + + mPkt_wpending = realloc(mPkt_wpending,slice_size+mPkt_wpending_size+PQISTREAM_PARTIAL_PACKET_HEADER_SIZE) ; + memcpy( &((char*)mPkt_wpending)[mPkt_wpending_size+PQISTREAM_PARTIAL_PACKET_HEADER_SIZE],dta,slice_size) ; + free(dta); + + // New2: pp ff xxxxxxxx ssss [data, sss bytes] => [flags 1B] [protocol version 1B] [2^32 packet count] [2^16 size] + + uint8_t partial_flags = 0 ; + if(slice_starts) partial_flags |= PQISTREAM_SLICE_FLAG_STARTS ; + if(slice_ends ) partial_flags |= PQISTREAM_SLICE_FLAG_ENDS ; + + ((char*)mPkt_wpending)[mPkt_wpending_size+0x00] = PQISTREAM_SLICE_PROTOCOL_VERSION_ID_01 ; + ((char*)mPkt_wpending)[mPkt_wpending_size+0x01] = partial_flags ; + ((char*)mPkt_wpending)[mPkt_wpending_size+0x02] = uint8_t(slice_packet_id >> 24) & 0xff ; + ((char*)mPkt_wpending)[mPkt_wpending_size+0x03] = uint8_t(slice_packet_id >> 16) & 0xff ; + ((char*)mPkt_wpending)[mPkt_wpending_size+0x04] = uint8_t(slice_packet_id >> 8) & 0xff ; + ((char*)mPkt_wpending)[mPkt_wpending_size+0x05] = uint8_t(slice_packet_id >> 0) & 0xff ; + ((char*)mPkt_wpending)[mPkt_wpending_size+0x06] = uint8_t(slice_size >> 8) & 0xff ; + ((char*)mPkt_wpending)[mPkt_wpending_size+0x07] = uint8_t(slice_size >> 0) & 0xff ; + + mPkt_wpending_size += slice_size + PQISTREAM_PARTIAL_PACKET_HEADER_SIZE; + ++k ; + } + } + while(mPkt_wpending_size < (uint32_t)maxbytes && mPkt_wpending_size < PQISTREAM_OPTIMAL_PACKET_SIZE ) ; + +#ifdef DEBUG_PQISTREAMER + if(k > 1) + std::cerr << "Packed " << k << " packets into " << mPkt_wpending_size << " bytes." << std::endl; +#endif + } + if (mPkt_wpending) { // write packet. #ifdef DEBUG_PQISTREAMER - std::cout << "Sending Out Pkt of size " << mPkt_wpending_size << " !" << std::endl; + std::cout << "Sending Out Pkt of size " << mPkt_wpending_size << " !" << std::endl; #endif int ss=0; @@ -507,12 +617,19 @@ int pqistreamer::handleoutgoing_locked() // std::cerr << out << std::endl ; pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out); #endif + std::cerr << PeerId() << ": sending failed. Only " << ss << " bytes sent over " << mPkt_wpending_size << std::endl; // pkt_wpending will kept til next time. // ensuring exactly the same data is written (openSSL requirement). return -1; } +#ifdef DEBUG_PQISTREAMER + else + std::cerr << PeerId() << ": sent " << ss << " bytes " << std::endl; +#endif + ++nsent; + outSentBytes_locked(mPkt_wpending_size); // this is the only time where we know exactly what was sent. #ifdef DEBUG_TRANSFERS @@ -541,309 +658,405 @@ int pqistreamer::handleoutgoing_locked() */ int pqistreamer::handleincoming_locked() { - int readbytes = 0; - static const int max_failed_read_attempts = 2000 ; + int readbytes = 0; + static const int max_failed_read_attempts = 2000 ; #ifdef DEBUG_PQISTREAMER - pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::handleincoming_locked()"); + pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::handleincoming_locked()"); #endif - if(!(mBio->isactive())) - { - mReading_state = reading_state_initial ; - free_rpend_locked(); - return 0; - } + if(!(mBio->isactive())) + { + mReading_state = reading_state_initial ; + free_rpend_locked(); + return 0; + } else - allocate_rpend_locked(); + allocate_rpend_locked(); - // enough space to read any packet. - int maxlen = mPkt_rpend_size; - void *block = mPkt_rpending; + // enough space to read any packet. + int maxlen = mPkt_rpend_size; + void *block = mPkt_rpending; - // initial read size: basic packet. - int blen = getRsPktBaseSize(); + // initial read size: basic packet. + int blen = getRsPktBaseSize(); // this is valid for both packet slices and normal un-sliced packets (same header size) - int maxin = inAllowedBytes_locked(); + int maxin = inAllowedBytes_locked(); #ifdef DEBUG_PQISTREAMER - std::cerr << "[" << (void*)pthread_self() << "] " << "reading state = " << mReading_state << std::endl ; + std::cerr << "[" << (void*)pthread_self() << "] " << "reading state = " << mReading_state << std::endl ; #endif - switch(mReading_state) - { - case reading_state_initial: /*std::cerr << "jumping to start" << std::endl; */ goto start_packet_read ; - case reading_state_packet_started: /*std::cerr << "jumping to middle" << std::endl;*/ goto continue_packet ; - } + switch(mReading_state) + { + case reading_state_initial: /*std::cerr << "jumping to start" << std::endl; */ goto start_packet_read ; + case reading_state_packet_started: /*std::cerr << "jumping to middle" << std::endl;*/ goto continue_packet ; + } start_packet_read: - { // scope to ensure variable visibility - // read the basic block (minimum packet size) - int tmplen; + { // scope to ensure variable visibility + // read the basic block (minimum packet size) + int tmplen; #ifdef DEBUG_PQISTREAMER - std::cerr << "[" << (void*)pthread_self() << "] " << "starting packet" << std::endl ; + std::cerr << "[" << (void*)pthread_self() << "] " << "starting packet" << std::endl ; #endif - memset(block,0,blen) ; // reset the block, to avoid uninitialized memory reads. + memset(block,0,blen) ; // reset the block, to avoid uninitialized memory reads. - if (blen != (tmplen = mBio->readdata(block, blen))) - { - pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, "pqistreamer::handleincoming() Didn't read BasePkt!"); + if (blen != (tmplen = mBio->readdata(block, blen))) + { + pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, "pqistreamer::handleincoming() Didn't read BasePkt!"); - // error.... (either blocked or failure) - if (tmplen == 0) - { + // error.... (either blocked or failure) + if (tmplen == 0) + { #ifdef DEBUG_PQISTREAMER - // most likely blocked! - pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, "pqistreamer::handleincoming() read blocked"); - std::cerr << "[" << (void*)pthread_self() << "] " << "given up 1" << std::endl ; + // most likely blocked! + pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, "pqistreamer::handleincoming() read blocked"); + std::cerr << "[" << (void*)pthread_self() << "] " << "given up 1" << std::endl ; #endif - return 0; - } - else if (tmplen < 0) - { - // Most likely it is that the packet is pending but could not be read by pqissl because of stream flow. - // So we return without an error, and leave the machine state in 'start_read'. - // - //pqioutput(PQL_WARNING, pqistreamerzone, "pqistreamer::handleincoming() Error in bio read"); + return 0; + } + else if (tmplen < 0) + { + // Most likely it is that the packet is pending but could not be read by pqissl because of stream flow. + // So we return without an error, and leave the machine state in 'start_read'. + // + //pqioutput(PQL_WARNING, pqistreamerzone, "pqistreamer::handleincoming() Error in bio read"); #ifdef DEBUG_PQISTREAMER - std::cerr << "[" << (void*)pthread_self() << "] " << "given up 2, state = " << mReading_state << std::endl ; + std::cerr << "[" << (void*)pthread_self() << "] " << "given up 2, state = " << mReading_state << std::endl ; #endif - return 0; - } - else // tmplen > 0 - { - // strange case....This should never happen as partial reads are handled by pqissl below. + return 0; + } + else // tmplen > 0 + { + // strange case....This should never happen as partial reads are handled by pqissl below. #ifdef DEBUG_PQISTREAMER - std::string out = "pqistreamer::handleincoming() Incomplete "; - rs_sprintf_append(out, "(Strange) read of %d bytes", tmplen); - pqioutput(PQL_ALERT, pqistreamerzone, out); + std::string out = "pqistreamer::handleincoming() Incomplete "; + rs_sprintf_append(out, "(Strange) read of %d bytes", tmplen); + pqioutput(PQL_ALERT, pqistreamerzone, out); - std::cerr << "[" << (void*)pthread_self() << "] " << "given up 3" << std::endl ; + std::cerr << "[" << (void*)pthread_self() << "] " << "given up 3" << std::endl ; #endif - return -1; - } - } + return -1; + } + } #ifdef DEBUG_PQISTREAMER - std::cerr << "[" << (void*)pthread_self() << "] " << "block 0 : " << (int)(((unsigned char*)block)[0]) << " " << (int)(((unsigned char*)block)[1]) << " " << (int)(((unsigned char*)block)[2]) << " " - << (int)(((unsigned char*)block)[3]) << " " - << (int)(((unsigned char*)block)[4]) << " " - << (int)(((unsigned char*)block)[5]) << " " - << (int)(((unsigned char*)block)[6]) << " " - << (int)(((unsigned char*)block)[7]) << " " << std::endl ; + std::cerr << "[" << (void*)pthread_self() << "] " << "block 0 : " << RsUtil::BinToHex(block,8) << std::endl; #endif - readbytes += blen; - mReading_state = reading_state_packet_started ; - mFailed_read_attempts = 0 ; // reset failed read, as the packet has been totally read. - } + readbytes += blen; + mReading_state = reading_state_packet_started ; + mFailed_read_attempts = 0 ; // reset failed read, as the packet has been totally read. + + // Check for packet slicing probe (04/26/2016). To be removed when everyone uses it. + + if(!memcmp(block,PACKET_SLICING_PROBE_BYTES,8)) + { + mAcceptsPacketSlicing = true ; +#ifdef DEBUG_PACKET_SLICING + std::cerr << "(II) Enabling packet slicing!" << std::endl; +#endif + } + } continue_packet: - { - // workout how much more to read. - int extralen = getRsItemSize(block) - blen; + { + // workout how much more to read. + + bool is_partial_packet = false ; + bool is_packet_starting = (((char*)block)[1] == PQISTREAM_SLICE_FLAG_STARTS) ; // STARTS and ENDS flags are actually never combined. + bool is_packet_ending = (((char*)block)[1] == PQISTREAM_SLICE_FLAG_ENDS) ; + bool is_packet_middle = (((char*)block)[1] == 0x00) ; + + uint32_t extralen =0; + uint32_t slice_packet_id =0; + + if( ((char*)block)[0] == PQISTREAM_SLICE_PROTOCOL_VERSION_ID_01 && ( is_packet_starting || is_packet_middle || is_packet_ending)) + { + extralen = (uint32_t(((uint8_t*)block)[6]) << 8 ) + (uint32_t(((uint8_t*)block)[7])); + slice_packet_id = (uint32_t(((uint8_t*)block)[2]) << 24) + (uint32_t(((uint8_t*)block)[3]) << 16) + (uint32_t(((uint8_t*)block)[4]) << 8) + (uint32_t(((uint8_t*)block)[5]) << 0); + +#ifdef DEBUG_PACKET_SLICING + std::cerr << "Reading partial packet from mem block " << RsUtil::BinToHex((char*)block,8) << ": packet_id=" << std::hex << slice_packet_id << std::dec << ", len=" << extralen << std::endl; +#endif + is_partial_packet = true ; + + mAcceptsPacketSlicing = true ; // this is needed + } + else + extralen = getRsItemSize(block) - blen; // old style packet type #ifdef DEBUG_PQISTREAMER - std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet getRsItemSize(block) = " << getRsItemSize(block) << std::endl ; - std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet extralen = " << extralen << std::endl ; + std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet getRsItemSize(block) = " << getRsItemSize(block) << std::endl ; + std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet extralen = " << extralen << std::endl ; - std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet state=" << mReading_state << std::endl ; - std::cerr << "[" << (void*)pthread_self() << "] " << "block 1 : " << (int)(((unsigned char*)block)[0]) << " " << (int)(((unsigned char*)block)[1]) << " " << (int)(((unsigned char*)block)[2]) << " " << (int)(((unsigned char*)block)[3]) << " " - << (int)(((unsigned char*)block)[4]) << " " - << (int)(((unsigned char*)block)[5]) << " " - << (int)(((unsigned char*)block)[6]) << " " - << (int)(((unsigned char*)block)[7]) << " " << std::endl ; + std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet state=" << mReading_state << std::endl ; + std::cerr << "[" << (void*)pthread_self() << "] " << "block 1 : " << RsUtil::BinToHex(block,8) << std::endl; #endif - if (extralen > maxlen - blen) - { - pqioutput(PQL_ALERT, pqistreamerzone, "ERROR: Read Packet too Big!"); + if (extralen > maxlen - blen) + { + pqioutput(PQL_ALERT, pqistreamerzone, "ERROR: Read Packet too Big!"); - p3Notify *notify = RsServer::notify(); - if (notify) - { - std::string title = - "Warning: Bad Packet Read"; + p3Notify *notify = RsServer::notify(); + if (notify) + { + std::string title = + "Warning: Bad Packet Read"; - std::string msg; - msg = " **** WARNING **** \n"; - msg += "Retroshare has caught a BAD Packet Read"; - msg += "\n"; - msg += "This is normally caused by connecting to an"; - msg += " OLD version of Retroshare"; - msg += "\n"; - rs_sprintf_append(msg, "(M:%d B:%d E:%d)\n", maxlen, blen, extralen); - msg += "\n"; - rs_sprintf_append(msg, "block = %d %d %d %d %d %d %d %d\n", - (int)(((unsigned char*)block)[0]), - (int)(((unsigned char*)block)[1]), - (int)(((unsigned char*)block)[2]), - (int)(((unsigned char*)block)[3]), - (int)(((unsigned char*)block)[4]), - (int)(((unsigned char*)block)[5]), - (int)(((unsigned char*)block)[6]), - (int)(((unsigned char*)block)[7])) ; - msg += "\n"; - msg += "Please get your friends to upgrade to the latest version"; - msg += "\n"; - msg += "\n"; - msg += "If you are sure the error was not caused by an old version"; - msg += "\n"; - msg += "Please report the problem to Retroshare's developers"; - msg += "\n"; + std::string msg; + msg = " **** WARNING **** \n"; + msg += "Retroshare has caught a BAD Packet Read"; + msg += "\n"; + msg += "This is normally caused by connecting to an"; + msg += " OLD version of Retroshare"; + msg += "\n"; + rs_sprintf_append(msg, "(M:%d B:%d E:%d)\n", maxlen, blen, extralen); + msg += "\n"; + msg += "block = " ; + msg += RsUtil::BinToHex((char*)block,8); - notify->AddLogMessage(0, RS_SYS_WARNING, title, msg); + msg += "\n"; + msg += "Please get your friends to upgrade to the latest version"; + msg += "\n"; + msg += "\n"; + msg += "If you are sure the error was not caused by an old version"; + msg += "\n"; + msg += "Please report the problem to Retroshare's developers"; + msg += "\n"; - std::cerr << "pqistreamer::handle_incoming() ERROR: Read Packet too Big" << std::endl; - std::cerr << msg; - std::cerr << std::endl; + notify->AddLogMessage(0, RS_SYS_WARNING, title, msg); - } - mBio->close(); - mReading_state = reading_state_initial ; // restart at state 1. - mFailed_read_attempts = 0 ; - return -1; + std::cerr << "pqistreamer::handle_incoming() ERROR: Read Packet too Big" << std::endl; + std::cerr << msg; + std::cerr << std::endl; - // Used to exit now! exit(1); - } + } + mBio->close(); + mReading_state = reading_state_initial ; // restart at state 1. + mFailed_read_attempts = 0 ; + return -1; - if (extralen > 0) - { - void *extradata = (void *) (((char *) block) + blen); - int tmplen ; + // Used to exit now! exit(1); + } - // Don't reset the block now! If pqissl is in the middle of a multiple-chunk - // packet (larger than 16384 bytes), and pqistreamer jumped directly yo - // continue_packet:, then readdata is going to write after the beginning of - // extradata, yet not exactly at start -> the start of the packet would be wiped out. - // - // so, don't do that: - // memset( extradata,0,extralen ) ; + if (extralen > 0) + { + void *extradata = (void *) (((char *) block) + blen); + int tmplen ; - if (extralen != (tmplen = mBio->readdata(extradata, extralen))) - { + // Don't reset the block now! If pqissl is in the middle of a multiple-chunk + // packet (larger than 16384 bytes), and pqistreamer jumped directly yo + // continue_packet:, then readdata is going to write after the beginning of + // extradata, yet not exactly at start -> the start of the packet would be wiped out. + // + // so, don't do that: + // memset( extradata,0,extralen ) ; + + if (extralen != (tmplen = mBio->readdata(extradata, extralen))) + { #ifdef DEBUG_PQISTREAMER - if(tmplen > 0) - std::cerr << "[" << (void*)pthread_self() << "] " << "Incomplete packet read ! This is a real problem ;-)" << std::endl ; + if(tmplen > 0) + std::cerr << "[" << (void*)pthread_self() << "] " << "Incomplete packet read ! This is a real problem ;-)" << std::endl ; #endif - if(++mFailed_read_attempts > max_failed_read_attempts) - { - std::string out; - rs_sprintf(out, "Error Completing Read (read %d/%d)", tmplen, extralen); - std::cerr << out << std::endl ; - pqioutput(PQL_ALERT, pqistreamerzone, out); + if(++mFailed_read_attempts > max_failed_read_attempts) + { + std::string out; + rs_sprintf(out, "Error Completing Read (read %d/%d)", tmplen, extralen); + std::cerr << out << std::endl ; + pqioutput(PQL_ALERT, pqistreamerzone, out); - p3Notify *notify = RsServer::notify(); - if (notify) - { - std::string title = "Warning: Error Completing Read"; + p3Notify *notify = RsServer::notify(); + if (notify) + { + std::string title = "Warning: Error Completing Read"; - std::string msgout; - msgout = " **** WARNING **** \n"; - msgout += "Retroshare has experienced an unexpected Read ERROR"; - msgout += "\n"; - rs_sprintf_append(msgout, "(M:%d B:%d E:%d R:%d)\n", maxlen, blen, extralen, tmplen); - msgout += "\n"; - msgout += "Note: this error might as well happen (rarely) when a peer disconnects in between a transmission of a large packet.\n"; - msgout += "If it happens manny time, please contact the developers, and send them these numbers:"; - msgout += "\n"; + std::string msgout; + msgout = " **** WARNING **** \n"; + msgout += "Retroshare has experienced an unexpected Read ERROR"; + msgout += "\n"; + rs_sprintf_append(msgout, "(M:%d B:%d E:%d R:%d)\n", maxlen, blen, extralen, tmplen); + msgout += "\n"; + msgout += "Note: this error might as well happen (rarely) when a peer disconnects in between a transmission of a large packet.\n"; + msgout += "If it happens manny time, please contact the developers, and send them these numbers:"; + msgout += "\n"; - rs_sprintf_append(msgout, "block = %d %d %d %d %d %d %d %d\n", - (int)(((unsigned char*)block)[0]), - (int)(((unsigned char*)block)[1]), - (int)(((unsigned char*)block)[2]), - (int)(((unsigned char*)block)[3]), - (int)(((unsigned char*)block)[4]), - (int)(((unsigned char*)block)[5]), - (int)(((unsigned char*)block)[6]), - (int)(((unsigned char*)block)[7])); + msgout += "block = " ; + msgout += RsUtil::BinToHex((char*)block,8) + "\n" ; - //notify->AddSysMessage(0, RS_SYS_WARNING, title, msgout.str()); + std::cerr << msgout << std::endl; + } - std::cerr << msgout << std::endl; - } - - mBio->close(); - mReading_state = reading_state_initial ; // restart at state 1. - mFailed_read_attempts = 0 ; - return -1; - } - else - { + mBio->close(); + mReading_state = reading_state_initial ; // restart at state 1. + mFailed_read_attempts = 0 ; + return -1; + } + else + { #ifdef DEBUG_PQISTREAMER - std::cerr << "[" << (void*)pthread_self() << "] " << "given up 5, state = " << mReading_state << std::endl ; + std::cerr << "[" << (void*)pthread_self() << "] " << "given up 5, state = " << mReading_state << std::endl ; #endif - return 0 ; // this is just a SSL_WANT_READ error. Don't panic, we'll re-try the read soon. - // we assume readdata() returned either -1 or the complete read size. - } - } + return 0 ; // this is just a SSL_WANT_READ error. Don't panic, we'll re-try the read soon. + // we assume readdata() returned either -1 or the complete read size. + } + } #ifdef DEBUG_PQISTREAMER - std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet state=" << mReading_state << std::endl ; - std::cerr << "[" << (void*)pthread_self() << "] " << "block 2 : " << (int)(((unsigned char*)extradata)[0]) << " " << (int)(((unsigned char*)extradata)[1]) << " " << (int)(((unsigned char*)extradata)[2]) << " " << (int)(((unsigned char*)extradata)[3]) << " " - << (int)(((unsigned char*)extradata)[4]) << " " - << (int)(((unsigned char*)extradata)[5]) << " " - << (int)(((unsigned char*)extradata)[6]) << " " - << (int)(((unsigned char*)extradata)[7]) << " " << std::endl ; + std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet state=" << mReading_state << std::endl ; + std::cerr << "[" << (void*)pthread_self() << "] " << "block 2 : " << RsUtil::BinToHex(extradata,8) << std::endl; #endif - mFailed_read_attempts = 0 ; - readbytes += extralen; - } + mFailed_read_attempts = 0 ; + readbytes += extralen; + } - // create packet, based on header. + // create packet, based on header. #ifdef DEBUG_PQISTREAMER - { - std::string out; - rs_sprintf(out, "Read Data Block -> Incoming Pkt(%d)", blen + extralen); - //std::cerr << out ; - pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out); - } + { + std::string out; + rs_sprintf(out, "Read Data Block -> Incoming Pkt(%d)", blen + extralen); + //std::cerr << out ; + pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out); + } #endif - // std::cerr << "Deserializing packet of size " << pktlen <deserialise(block, &pktlen); - if ((pkt != NULL) && (0 < handleincomingitem_locked(pkt,pktlen))) - { + if ((pkt != NULL) && (0 < handleincomingitem_locked(pkt,pktlen))) + { #ifdef DEBUG_PQISTREAMER - pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, "Successfully Read a Packet!"); + pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, "Successfully Read a Packet!"); #endif - inReadBytes_locked(pktlen); // only count deserialised packets, because that's what is actually been transfered. - } - else - { + inReadBytes_locked(pktlen); // only count deserialised packets, because that's what is actually been transfered. + } + else if (!is_partial_packet) + { #ifdef DEBUG_PQISTREAMER - pqioutput(PQL_ALERT, pqistreamerzone, "Failed to handle Packet!"); + pqioutput(PQL_ALERT, pqistreamerzone, "Failed to handle Packet!"); #endif - std::cerr << "Incoming Packet could not be deserialised:" << std::endl; - std::cerr << " Incoming peer id: " << PeerId() << std::endl; - if(pktlen >= 8) - std::cerr << " Packet header : " << RsUtil::BinToHex((unsigned char*)block,8) << std::endl; - if(pktlen > 8) - std::cerr << " Packet data : " << RsUtil::BinToHex((unsigned char*)block+8,std::min(50u,pktlen-8)) << ((pktlen>58)?"...":"") << std::endl; - } + std::cerr << "Incoming Packet could not be deserialised:" << std::endl; + std::cerr << " Incoming peer id: " << PeerId() << std::endl; + if(pktlen >= 8) + std::cerr << " Packet header : " << RsUtil::BinToHex((unsigned char*)block,8) << std::endl; + if(pktlen > 8) + std::cerr << " Packet data : " << RsUtil::BinToHex((unsigned char*)block+8,std::min(50u,pktlen-8)) << ((pktlen>58)?"...":"") << std::endl; + } - mReading_state = reading_state_initial ; // restart at state 1. - mFailed_read_attempts = 0 ; // reset failed read, as the packet has been totally read. - } + mReading_state = reading_state_initial ; // restart at state 1. + mFailed_read_attempts = 0 ; // reset failed read, as the packet has been totally read. + } - if(maxin > readbytes && mBio->moretoread(0)) - goto start_packet_read ; + if(maxin > readbytes && mBio->moretoread(0)) + goto start_packet_read ; #ifdef DEBUG_TRANSFERS - if (readbytes >= maxin) - { - std::cerr << "pqistreamer::handleincoming() Stopped reading as readbytes >= maxin. Read " << readbytes << " bytes "; - std::cerr << std::endl; - } + if (readbytes >= maxin) + { + std::cerr << "pqistreamer::handleincoming() Stopped reading as readbytes >= maxin. Read " << readbytes << " bytes "; + std::cerr << std::endl; + } #endif - return 0; + return 0; } +RsItem *pqistreamer::addPartialPacket(const void *block,uint32_t len,uint32_t slice_packet_id,bool is_packet_starting,bool is_packet_ending) +{ +#ifdef DEBUG_PACKET_SLICING + std::cerr << "Receiving partial packet. size=" << len << ", ID=" << std::hex << slice_packet_id << std::dec << ", starting:" << is_packet_starting << ", ending:" << is_packet_ending ; +#endif + + if(is_packet_starting && is_packet_ending) + { + std::cerr << " (EE) unexpected situation: both starting and ending" << std::endl; + return NULL ; + } + + uint32_t slice_length = len - PQISTREAM_PARTIAL_PACKET_HEADER_SIZE ; + unsigned char *slice_data = &((unsigned char*)block)[PQISTREAM_PARTIAL_PACKET_HEADER_SIZE] ; + + std::map::iterator it = mPartialPackets.find(slice_packet_id) ; + + if(it == mPartialPackets.end()) + { + // make sure we really have a starting packet. Otherwise this is an error. + + if(!is_packet_starting) + { + std::cerr << " (EE) non starting packet has no record. Dropping" << std::endl; + return NULL ; + } + PartialPacketRecord& rec = mPartialPackets[slice_packet_id] ; + + rec.mem = rs_malloc(slice_length) ; + + if(!rec.mem) + { + std::cerr << " (EE) Cannot allocate memory for slice of size " << slice_length << std::endl; + return NULL ; + } + + memcpy(rec.mem, slice_data, slice_length) ; ; + rec.size = slice_length ; + +#ifdef DEBUG_PACKET_SLICING + std::cerr << " => stored in new record (size=" << rec.size << std::endl; +#endif + + return NULL ; // no need to check for ending + } + else + { + PartialPacketRecord& rec = it->second ; + + if(is_packet_starting) + { + std::cerr << "(WW) dropping unfinished existing packet that gets to be replaced by new starting packet." << std::endl; + free(rec.mem); + rec.size = 0 ; + } + // make sure this is a continuing packet, otherwise this is an error. + + rec.mem = realloc(rec.mem, rec.size + slice_length) ; + memcpy( &((char*)rec.mem)[rec.size],slice_data,slice_length) ; + rec.size += slice_length ; + +#ifdef DEBUG_PACKET_SLICING + std::cerr << " => added to existing record size=" << rec.size ; +#endif + + if(is_packet_ending) + { +#ifdef DEBUG_PACKET_SLICING + std::cerr << " => deserialising: mem=" << RsUtil::BinToHex((char*)rec.mem,std::min(8u,rec.size)) << std::endl; +#endif + RsItem *item = mRsSerialiser->deserialise(rec.mem, &rec.size); + + free(rec.mem) ; + mPartialPackets.erase(it) ; + return item ; + } + else + { +#ifdef DEBUG_PACKET_SLICING + std::cerr << std::endl; +#endif + return NULL ; + } + } +} /* BandWidth Management Assistance */ @@ -1074,8 +1287,13 @@ int pqistreamer::locked_gatherStatistics(std::list& out_lst,std:: return 1 ; } -void *pqistreamer::locked_pop_out_data() +void *pqistreamer::locked_pop_out_data(uint32_t max_slice_size,uint32_t& size,bool& starts,bool& ends,uint32_t& packet_id) { + size = 0 ; + starts = true ; + ends = true ; + packet_id = 0 ; + void *res = NULL ; if (!mOutPkts.empty()) @@ -1089,3 +1307,5 @@ void *pqistreamer::locked_pop_out_data() } return res ; } + + diff --git a/libretroshare/src/pqi/pqistreamer.h b/libretroshare/src/pqi/pqistreamer.h index ff1cf0a12..b50244622 100644 --- a/libretroshare/src/pqi/pqistreamer.h +++ b/libretroshare/src/pqi/pqistreamer.h @@ -40,6 +40,12 @@ // The interface does not handle connection, just communication. // possible bioflags: BIN_FLAGS_NO_CLOSE | BIN_FLAGS_NO_DELETE +struct PartialPacketRecord +{ + void *mem ; + uint32_t size ; +}; + class pqistreamer: public PQInterface { public: @@ -71,13 +77,12 @@ class pqistreamer: public PQInterface // These methods are redefined in pqiQoSstreamer // - virtual void locked_storeInOutputQueue(void *ptr,int priority) ; + virtual void locked_storeInOutputQueue(void *ptr, int size, int priority) ; virtual int locked_out_queue_size() const ; virtual void locked_clear_out_queue() ; virtual int locked_compute_out_pkt_size() const ; - virtual void *locked_pop_out_data() ; - //virtual int locked_gatherStatistics(std::vector& per_service_count,std::vector& per_priority_count) const; // extracting data. - virtual int locked_gatherStatistics(std::list& outqueue_stats,std::list& inqueue_stats); // extracting data. + virtual void *locked_pop_out_data(uint32_t max_slice_size,uint32_t& size,bool& starts,bool& ends,uint32_t& packet_id); + virtual int locked_gatherStatistics(std::list& outqueue_stats,std::list& inqueue_stats); // extracting data. void updateRates() ; @@ -156,7 +161,12 @@ class pqistreamer: public PQInterface std::list mCurrentStatsChunk_Out ; time_t mStatisticsTimeStamp ; - void locked_addTrafficClue(const RsItem *pqi, uint32_t pktsize, std::list &lst); + bool mAcceptsPacketSlicing ; + time_t mLastSentPacketSlicingProbe ; + void locked_addTrafficClue(const RsItem *pqi, uint32_t pktsize, std::list &lst); + RsItem *addPartialPacket(const void *block, uint32_t len, uint32_t slice_packet_id,bool packet_starting,bool packet_ending); + + std::map mPartialPackets ; }; #endif //MRK_PQI_STREAMER_HEADER diff --git a/libretroshare/src/rsserver/p3face-server.cc b/libretroshare/src/rsserver/p3face-server.cc index b76913693..037ef3a44 100644 --- a/libretroshare/src/rsserver/p3face-server.cc +++ b/libretroshare/src/rsserver/p3face-server.cc @@ -69,8 +69,12 @@ static double getCurrentTS() return cts; } -const double RsServer::minTimeDelta = 0.1; // 25; -const double RsServer::maxTimeDelta = 0.5; +// These values should be tunable from the GUI, to offer a compromise between speed and CPU use. +// In some cases (VOIP) it's likely that we will need to set them temporarily to a very low +// value, in order to favor a fast feedback + +const double RsServer::minTimeDelta = 0.05; // 25; +const double RsServer::maxTimeDelta = 0.2; const double RsServer::kickLimit = 0.15; @@ -138,7 +142,7 @@ void RsServer::data_tick() double ts = getCurrentTS(); double delta = ts - mLastts; - + /* for the fast ticked stuff */ if (delta > mTimeDelta) { diff --git a/libretroshare/src/serialiser/rsserviceids.h b/libretroshare/src/serialiser/rsserviceids.h index b7d7b2fa1..7aadd3dea 100644 --- a/libretroshare/src/serialiser/rsserviceids.h +++ b/libretroshare/src/serialiser/rsserviceids.h @@ -96,6 +96,9 @@ const uint16_t RS_SERVICE_TYPE_PLUGIN_ARADO_ID = 0x2001; const uint16_t RS_SERVICE_TYPE_PLUGIN_QCHESS_ID = 0x2002; const uint16_t RS_SERVICE_TYPE_PLUGIN_FEEDREADER = 0x2003; +// Reserved for packet slicing probes. +const uint16_t RS_SERVICE_TYPE_PACKET_SLICING_PROBE = 0xAABB; + // Nabu's services. const uint16_t RS_SERVICE_TYPE_PLUGIN_FIDO_GW = 0xF1D0; const uint16_t RS_SERVICE_TYPE_PLUGIN_ZERORESERVE = 0xBEEF; diff --git a/libretroshare/src/services/p3rtt.cc b/libretroshare/src/services/p3rtt.cc index 1e3c96b61..751eccba4 100644 --- a/libretroshare/src/services/p3rtt.cc +++ b/libretroshare/src/services/p3rtt.cc @@ -23,6 +23,8 @@ * */ +#include + #include "util/rsdir.h" #include "retroshare/rsiface.h" #include "pqi/pqibin.h" @@ -38,7 +40,6 @@ * #define DEBUG_RTT 1 ****/ - /* DEFINE INTERFACE POINTER! */ RsRtt *rsRtt = NULL; @@ -168,7 +169,7 @@ int p3rtt::sendPackets() pt = mSentPingTime; } - if (now - pt > RTT_PING_PERIOD) + if (now >= pt+RTT_PING_PERIOD) { sendPingMeasurements(); @@ -190,19 +191,10 @@ void p3rtt::sendPingMeasurements() mServiceCtrl->getPeersConnected(getServiceInfo().mServiceType, idList); -#ifdef DEBUG_RTT - std::cerr << "p3rtt::sendPingMeasurements() @ts: " << ts; - std::cerr << std::endl; -#endif - /* prepare packets */ std::set::iterator it; for(it = idList.begin(); it != idList.end(); ++it) { -#ifdef DEBUG_RTT - std::cerr << "p3rtt::sendPingMeasurements() Pinging: " << *it; - std::cerr << std::endl; -#endif double ts = getCurrentTS(); /* create the packet */ @@ -214,11 +206,8 @@ void p3rtt::sendPingMeasurements() storePingAttempt(*it, ts, mCounter); #ifdef DEBUG_RTT - std::cerr << "p3rtt::sendPingMeasurements() With Packet:"; - std::cerr << std::endl; - pingPkt->print(std::cerr, 10); + std::cerr << "p3rtt::sendPingMeasurements() Pinging: " << *it << " [" << pingPkt->mSeqNo << "," << std::hex << pingPkt->mPingTS << std::dec << "]" << std::endl;; #endif - sendItem(pingPkt); } @@ -256,30 +245,28 @@ int p3rtt::handlePing(RsItem *item) /* cast to right type */ RsRttPingItem *ping = (RsRttPingItem *) item; + double ts = getCurrentTS(); #ifdef DEBUG_RTT - std::cerr << "p3rtt::handlePing() Recvd Packet from: " << ping->PeerId(); - std::cerr << std::endl; + std::cerr << "p3rtt::handlePing() from: " << ping->PeerId() << " - [" << ping->mSeqNo << "," << std::hex << ping->mPingTS << std::dec << "] " << std::endl; + std::cerr << "incoming ping travel time: " << ts - convert64bitsToTs(ping->mPingTS) << std::endl; #endif /* with a ping, we just respond as quickly as possible - they do all the analysis */ RsRttPongItem *pong = new RsRttPongItem(); - pong->PeerId(ping->PeerId()); pong->mPingTS = ping->mPingTS; pong->mSeqNo = ping->mSeqNo; // add our timestamp. - double ts = getCurrentTS(); pong->mPongTS = convertTsTo64bits(ts); - + static double mLastResponseToPong = 0.0 ;// bad stuff #ifdef DEBUG_RTT - std::cerr << "p3rtt::handlePing() With Packet:"; - std::cerr << std::endl; - pong->print(std::cerr, 10); + std::cerr << "Delay since last response to PONG: " << ts - mLastResponseToPong << std::endl; #endif - + + mLastResponseToPong = ts ; sendItem(pong); return true ; } @@ -291,9 +278,7 @@ int p3rtt::handlePong(RsItem *item) RsRttPongItem *pong = (RsRttPongItem *) item; #ifdef DEBUG_RTT - std::cerr << "p3rtt::handlePong() Recvd Packet from: " << pong->PeerId(); - std::cerr << std::endl; - pong->print(std::cerr, 10); + std::cerr << "p3rtt::handlePong() from: " << pong->PeerId() << " - [" << pong->mSeqNo << "," << std::hex << pong->mPingTS << " -> " << pong->mPongTS << std::dec << "] "<< std::endl; #endif /* with a pong, we do the maths! */ @@ -305,21 +290,12 @@ int p3rtt::handlePong(RsItem *item) double offset = pongTS - (recvTS - rtt / 2.0); // so to get to their time, we go ourTS + offset. #ifdef DEBUG_RTT - std::cerr << "p3rtt::handlePong() Timing:"; - std::cerr << std::endl; - std::cerr << "\tpingTS: " << pingTS; - std::cerr << std::endl; - std::cerr << "\tpongTS: " << pongTS; - std::cerr << std::endl; - std::cerr << "\trecvTS: " << recvTS; - std::cerr << std::endl; - std::cerr << "\t ==> rtt: " << rtt; - std::cerr << std::endl; - std::cerr << "\t ==> offset: " << offset; - std::cerr << std::endl; + std::cerr << "incoming pong travel time: " << recvTS - convert64bitsToTs(pong->mPongTS) << std::endl; + std::cerr << " RTT analysis: pingTS: " << std::setprecision(16) << pingTS << ", pongTS: " << pongTS + << ", recvTS: " << std::setprecision(16) << recvTS << " ==> rtt: " << rtt << ", offset: " << offset << std::endl; #endif - storePongResult(pong->PeerId(), pong->mSeqNo, pingTS, rtt, offset); + storePongResult(pong->PeerId(), pong->mSeqNo, recvTS, rtt, offset); return true ; } @@ -333,6 +309,9 @@ int p3rtt::storePingAttempt(const RsPeerId& id, double ts, uint32_t seqno) /* find corresponding local data */ RttPeerInfo *peerInfo = locked_GetPeerInfo(id); +#ifdef DEBUG_RTT + std::cerr << "Delay since previous ping attempt: " << ts - peerInfo->mCurrentPingTS << std::endl; +#endif peerInfo->mCurrentPingTS = ts; peerInfo->mCurrentPingCounter = seqno; @@ -349,7 +328,7 @@ int p3rtt::storePingAttempt(const RsPeerId& id, double ts, uint32_t seqno) -int p3rtt::storePongResult(const RsPeerId& id, uint32_t counter, double ts, double rtt, double offset) +int p3rtt::storePongResult(const RsPeerId& id, uint32_t counter, double recv_ts, double rtt, double offset) { RsStackMutex stack(mRttMtx); /****** LOCKED MUTEX *******/ @@ -366,8 +345,12 @@ int p3rtt::storePongResult(const RsPeerId& id, uint32_t counter, double ts, doub { peerInfo->mCurrentPongRecvd = true; } +#ifdef DEBUG_RTT + if(!peerInfo->mPongResults.empty()) + std::cerr << "Delay since last pong: " << recv_ts - peerInfo->mPongResults.back().mTS << std::endl; +#endif - peerInfo->mPongResults.push_back(RsRttPongResult(ts, rtt, offset)); + peerInfo->mPongResults.push_back(RsRttPongResult(recv_ts, rtt, offset)); while(peerInfo->mPongResults.size() > MAX_PONG_RESULTS) diff --git a/libretroshare/src/services/p3rtt.h b/libretroshare/src/services/p3rtt.h index 0b7e61f46..9b881fba8 100644 --- a/libretroshare/src/services/p3rtt.h +++ b/libretroshare/src/services/p3rtt.h @@ -84,7 +84,7 @@ virtual bool recvItem(RsItem *item); // Overloaded from p3FastService. int handlePong(RsItem *item); int storePingAttempt(const RsPeerId& id, double ts, uint32_t mCounter); - int storePongResult(const RsPeerId& id, uint32_t counter, double ts, double rtt, double offset); + int storePongResult(const RsPeerId& id, uint32_t counter, double recv_ts, double rtt, double offset); /*!