mirror of
https://github.com/RetroShare/RetroShare.git
synced 2024-10-01 02:35:48 -04:00
commit
baf940443d
@ -6,6 +6,8 @@
|
|||||||
|
|
||||||
#include "pqiqos.h"
|
#include "pqiqos.h"
|
||||||
|
|
||||||
|
const uint32_t pqiQoS::MAX_PACKET_COUNTER_VALUE = (1 << 24) ;
|
||||||
|
|
||||||
pqiQoS::pqiQoS(uint32_t nb_levels,float alpha)
|
pqiQoS::pqiQoS(uint32_t nb_levels,float alpha)
|
||||||
: _item_queues(nb_levels),_alpha(alpha)
|
: _item_queues(nb_levels),_alpha(alpha)
|
||||||
{
|
{
|
||||||
@ -14,6 +16,7 @@ pqiQoS::pqiQoS(uint32_t nb_levels,float alpha)
|
|||||||
float c = 1.0f ;
|
float c = 1.0f ;
|
||||||
float inc = alpha ;
|
float inc = alpha ;
|
||||||
_nb_items = 0 ;
|
_nb_items = 0 ;
|
||||||
|
_id_counter = 0 ;
|
||||||
|
|
||||||
for(int i=((int)nb_levels)-1;i>=0;--i,c *= alpha)
|
for(int i=((int)nb_levels)-1;i>=0;--i,c *= alpha)
|
||||||
{
|
{
|
||||||
@ -44,7 +47,7 @@ void pqiQoS::print() const
|
|||||||
std::cerr << std::endl;
|
std::cerr << std::endl;
|
||||||
}
|
}
|
||||||
|
|
||||||
void pqiQoS::in_rsItem(void *ptr,int priority)
|
void pqiQoS::in_rsItem(void *ptr,int size,int priority)
|
||||||
{
|
{
|
||||||
if(uint32_t(priority) >= _item_queues.size())
|
if(uint32_t(priority) >= _item_queues.size())
|
||||||
{
|
{
|
||||||
@ -52,8 +55,11 @@ void pqiQoS::in_rsItem(void *ptr,int priority)
|
|||||||
priority = _item_queues.size()-1 ;
|
priority = _item_queues.size()-1 ;
|
||||||
}
|
}
|
||||||
|
|
||||||
_item_queues[priority].push(ptr) ;
|
_item_queues[priority].push(ptr,size,_id_counter++) ;
|
||||||
++_nb_items ;
|
++_nb_items ;
|
||||||
|
|
||||||
|
if(_id_counter >= MAX_PACKET_COUNTER_VALUE)
|
||||||
|
_id_counter = 0 ;
|
||||||
}
|
}
|
||||||
|
|
||||||
// int pqiQoS::gatherStatistics(std::vector<uint32_t>& per_service_count,std::vector<uint32_t>& per_priority_count) const
|
// int pqiQoS::gatherStatistics(std::vector<uint32_t>& per_service_count,std::vector<uint32_t>& per_priority_count) const
|
||||||
@ -80,7 +86,7 @@ void pqiQoS::in_rsItem(void *ptr,int priority)
|
|||||||
// }
|
// }
|
||||||
|
|
||||||
|
|
||||||
void *pqiQoS::out_rsItem()
|
void *pqiQoS::out_rsItem(uint32_t max_slice_size, uint32_t& size, bool& starts, bool& ends, uint32_t& packet_id)
|
||||||
{
|
{
|
||||||
// Go through the queues. Increment counters.
|
// Go through the queues. Increment counters.
|
||||||
|
|
||||||
@ -105,11 +111,21 @@ void *pqiQoS::out_rsItem()
|
|||||||
if(last >= 0)
|
if(last >= 0)
|
||||||
{
|
{
|
||||||
assert(_nb_items > 0) ;
|
assert(_nb_items > 0) ;
|
||||||
|
|
||||||
|
// now chop a slice of this item
|
||||||
|
|
||||||
|
void *res = _item_queues[last].slice(max_slice_size,size,starts,ends,packet_id) ;
|
||||||
|
|
||||||
|
if(ends)
|
||||||
--_nb_items ;
|
--_nb_items ;
|
||||||
return _item_queues[last].pop();
|
|
||||||
|
return res ;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
return NULL ;
|
return NULL ;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -36,14 +36,26 @@
|
|||||||
|
|
||||||
#include <stdint.h>
|
#include <stdint.h>
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
|
#include <string.h>
|
||||||
|
#include <iostream>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
#include <list>
|
#include <list>
|
||||||
|
|
||||||
|
#include <util/rsmemory.h>
|
||||||
|
|
||||||
class pqiQoS
|
class pqiQoS
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
pqiQoS(uint32_t max_levels,float alpha) ;
|
pqiQoS(uint32_t max_levels,float alpha) ;
|
||||||
|
|
||||||
|
struct ItemRecord
|
||||||
|
{
|
||||||
|
void *data ;
|
||||||
|
uint32_t current_offset ;
|
||||||
|
uint32_t size ;
|
||||||
|
uint32_t id ;
|
||||||
|
};
|
||||||
|
|
||||||
class ItemQueue
|
class ItemQueue
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
@ -56,17 +68,74 @@ class pqiQoS
|
|||||||
if(_items.empty())
|
if(_items.empty())
|
||||||
return NULL ;
|
return NULL ;
|
||||||
|
|
||||||
void *item = _items.front() ;
|
void *item = _items.front().data ;
|
||||||
_items.pop_front() ;
|
_items.pop_front() ;
|
||||||
--_item_count ;
|
--_item_count ;
|
||||||
|
|
||||||
return item ;
|
return item ;
|
||||||
}
|
}
|
||||||
|
|
||||||
void push(void *item)
|
void *slice(uint32_t max_size,uint32_t& size,bool& starts,bool& ends,uint32_t& packet_id)
|
||||||
{
|
{
|
||||||
_items.push_back(item) ;
|
if(_items.empty())
|
||||||
++_item_count ;
|
return NULL ;
|
||||||
|
|
||||||
|
ItemRecord& rec(_items.front()) ;
|
||||||
|
packet_id = rec.id ;
|
||||||
|
|
||||||
|
// readily get rid of the item if it can be sent as a whole
|
||||||
|
|
||||||
|
if(rec.current_offset == 0 && rec.size < max_size)
|
||||||
|
{
|
||||||
|
starts = true ;
|
||||||
|
ends = true ;
|
||||||
|
size = rec.size ;
|
||||||
|
|
||||||
|
return pop() ;
|
||||||
|
}
|
||||||
|
starts = (rec.current_offset == 0) ;
|
||||||
|
ends = (rec.current_offset + max_size >= rec.size) ;
|
||||||
|
|
||||||
|
if(rec.size <= rec.current_offset)
|
||||||
|
{
|
||||||
|
std::cerr << "(EE) severe error in slicing in QoS." << std::endl;
|
||||||
|
pop() ;
|
||||||
|
return NULL ;
|
||||||
|
}
|
||||||
|
|
||||||
|
size = std::min(max_size, uint32_t((int)rec.size - (int)rec.current_offset)) ;
|
||||||
|
void *mem = rs_malloc(size) ;
|
||||||
|
|
||||||
|
if(!mem)
|
||||||
|
{
|
||||||
|
std::cerr << "(EE) memory allocation error in QoS." << std::endl;
|
||||||
|
pop() ;
|
||||||
|
return NULL ;
|
||||||
|
}
|
||||||
|
|
||||||
|
memcpy(mem,&((unsigned char*)rec.data)[rec.current_offset],size) ;
|
||||||
|
|
||||||
|
if(ends) // we're taking the whole stuff. So we can delete the entry.
|
||||||
|
{
|
||||||
|
free(rec.data) ;
|
||||||
|
_items.pop_front() ;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
rec.current_offset += size ; // by construction, !ends implies rec.current_offset < rec.size
|
||||||
|
|
||||||
|
return mem ;
|
||||||
|
}
|
||||||
|
|
||||||
|
void push(void *item,uint32_t size,uint32_t id)
|
||||||
|
{
|
||||||
|
ItemRecord rec ;
|
||||||
|
|
||||||
|
rec.data = item ;
|
||||||
|
rec.current_offset = 0 ;
|
||||||
|
rec.size = size ;
|
||||||
|
rec.id = id ;
|
||||||
|
|
||||||
|
_items.push_back(rec) ;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint32_t size() const { return _item_count ; }
|
uint32_t size() const { return _item_count ; }
|
||||||
@ -75,16 +144,17 @@ class pqiQoS
|
|||||||
float _counter ;
|
float _counter ;
|
||||||
float _inc ;
|
float _inc ;
|
||||||
uint32_t _item_count ;
|
uint32_t _item_count ;
|
||||||
std::list<void*> _items ;
|
|
||||||
|
std::list<ItemRecord> _items ;
|
||||||
};
|
};
|
||||||
|
|
||||||
// This function pops items from the queue, y order of priority
|
// This function pops items from the queue, y order of priority
|
||||||
//
|
//
|
||||||
void *out_rsItem() ;
|
void *out_rsItem(uint32_t max_slice_size,uint32_t& size,bool& starts,bool& ends,uint32_t& packet_id) ;
|
||||||
|
|
||||||
// This function is used to queue items.
|
// This function is used to queue items.
|
||||||
//
|
//
|
||||||
void in_rsItem(void *item,int priority) ;
|
void in_rsItem(void *item, int size, int priority) ;
|
||||||
|
|
||||||
void print() const ;
|
void print() const ;
|
||||||
uint64_t qos_queue_size() const { return _nb_items ; }
|
uint64_t qos_queue_size() const { return _nb_items ; }
|
||||||
@ -99,12 +169,15 @@ class pqiQoS
|
|||||||
|
|
||||||
void computeTotalItemSize() const ;
|
void computeTotalItemSize() const ;
|
||||||
int debug_computeTotalItemSize() const ;
|
int debug_computeTotalItemSize() const ;
|
||||||
private:
|
private:
|
||||||
// This vector stores the lists of items with equal priorities.
|
// This vector stores the lists of items with equal priorities.
|
||||||
//
|
//
|
||||||
std::vector<ItemQueue> _item_queues ;
|
std::vector<ItemQueue> _item_queues ;
|
||||||
float _alpha ;
|
float _alpha ;
|
||||||
uint64_t _nb_items ;
|
uint64_t _nb_items ;
|
||||||
|
uint32_t _id_counter ;
|
||||||
|
|
||||||
|
static const uint32_t MAX_PACKET_COUNTER_VALUE ;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
@ -50,12 +50,12 @@ int pqiQoSstreamer::getQueueSize(bool in)
|
|||||||
// return pqiQoS::gatherStatistics(per_service_count,per_priority_count) ;
|
// return pqiQoS::gatherStatistics(per_service_count,per_priority_count) ;
|
||||||
//}
|
//}
|
||||||
|
|
||||||
void pqiQoSstreamer::locked_storeInOutputQueue(void *ptr,int priority)
|
void pqiQoSstreamer::locked_storeInOutputQueue(void *ptr,int size,int priority)
|
||||||
{
|
{
|
||||||
_total_item_size += getRsItemSize(ptr) ;
|
_total_item_size += size ;
|
||||||
++_total_item_count ;
|
++_total_item_count ;
|
||||||
|
|
||||||
pqiQoS::in_rsItem(ptr,priority) ;
|
pqiQoS::in_rsItem(ptr,size,priority) ;
|
||||||
}
|
}
|
||||||
|
|
||||||
void pqiQoSstreamer::locked_clear_out_queue()
|
void pqiQoSstreamer::locked_clear_out_queue()
|
||||||
@ -65,13 +65,15 @@ void pqiQoSstreamer::locked_clear_out_queue()
|
|||||||
_total_item_count = 0 ;
|
_total_item_count = 0 ;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *pqiQoSstreamer::locked_pop_out_data()
|
void *pqiQoSstreamer::locked_pop_out_data(uint32_t max_slice_size, uint32_t& size, bool& starts, bool& ends, uint32_t& packet_id)
|
||||||
{
|
{
|
||||||
void *out = pqiQoS::out_rsItem() ;
|
void *out = pqiQoS::out_rsItem(max_slice_size,size,starts,ends,packet_id) ;
|
||||||
|
|
||||||
if(out != NULL)
|
if(out != NULL)
|
||||||
{
|
{
|
||||||
_total_item_size -= getRsItemSize(out) ;
|
_total_item_size -= getRsItemSize(out) ;
|
||||||
|
|
||||||
|
if(ends)
|
||||||
--_total_item_count ;
|
--_total_item_count ;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -36,11 +36,11 @@ class pqiQoSstreamer: public pqithreadstreamer, public pqiQoS
|
|||||||
static const uint32_t PQI_QOS_STREAMER_MAX_LEVELS = 10 ;
|
static const uint32_t PQI_QOS_STREAMER_MAX_LEVELS = 10 ;
|
||||||
static const float PQI_QOS_STREAMER_ALPHA ;
|
static const float PQI_QOS_STREAMER_ALPHA ;
|
||||||
|
|
||||||
virtual void locked_storeInOutputQueue(void *ptr,int priority) ;
|
virtual void locked_storeInOutputQueue(void *ptr, int size, int priority) ;
|
||||||
virtual int locked_out_queue_size() const { return _total_item_count ; }
|
virtual int locked_out_queue_size() const { return _total_item_count ; }
|
||||||
virtual void locked_clear_out_queue() ;
|
virtual void locked_clear_out_queue() ;
|
||||||
virtual int locked_compute_out_pkt_size() const { return _total_item_size ; }
|
virtual int locked_compute_out_pkt_size() const { return _total_item_size ; }
|
||||||
virtual void *locked_pop_out_data() ;
|
virtual void *locked_pop_out_data(uint32_t max_slice_size,uint32_t& size,bool& starts,bool& ends,uint32_t& packet_id);
|
||||||
//virtual int locked_gatherStatistics(std::vector<uint32_t>& per_service_count,std::vector<uint32_t>& per_priority_count) const; // extracting data.
|
//virtual int locked_gatherStatistics(std::vector<uint32_t>& per_service_count,std::vector<uint32_t>& per_priority_count) const; // extracting data.
|
||||||
|
|
||||||
|
|
||||||
|
@ -1857,6 +1857,10 @@ bool pqissl::moretoread(uint32_t usec)
|
|||||||
#endif
|
#endif
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
else if(SSL_pending(ssl_connection) > 0)
|
||||||
|
{
|
||||||
|
return 1 ;
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
#ifdef PQISSL_DEBUG
|
#ifdef PQISSL_DEBUG
|
||||||
|
@ -30,6 +30,7 @@
|
|||||||
#include "util/rsdebug.h"
|
#include "util/rsdebug.h"
|
||||||
#include "util/rsstring.h"
|
#include "util/rsstring.h"
|
||||||
#include "util/rsprint.h"
|
#include "util/rsprint.h"
|
||||||
|
#include "util/rsscopetimer.h"
|
||||||
|
|
||||||
#include "pqi/pqistreamer.h"
|
#include "pqi/pqistreamer.h"
|
||||||
#include "rsserver/p3face.h"
|
#include "rsserver/p3face.h"
|
||||||
@ -41,12 +42,26 @@ const int pqistreamerzone = 8221;
|
|||||||
static const int PQISTREAM_ABS_MAX = 100000000; /* 100 MB/sec (actually per loop) */
|
static const int PQISTREAM_ABS_MAX = 100000000; /* 100 MB/sec (actually per loop) */
|
||||||
static const int PQISTREAM_AVG_PERIOD = 5; // update speed estimate every 5 seconds
|
static const int PQISTREAM_AVG_PERIOD = 5; // update speed estimate every 5 seconds
|
||||||
static const float PQISTREAM_AVG_FRAC = 0.8; // for bandpass filter over speed estimate.
|
static const float PQISTREAM_AVG_FRAC = 0.8; // for bandpass filter over speed estimate.
|
||||||
|
static const int PQISTREAM_OPTIMAL_PACKET_SIZE = 512; // It is believed that this value should be lower than TCP slices and large enough as compare to encryption padding.
|
||||||
|
// most importantly, it should be constant, so as to allow correct QoS.
|
||||||
|
static const int PQISTREAM_SLICE_FLAG_STARTS = 0x01; //
|
||||||
|
static const int PQISTREAM_SLICE_FLAG_ENDS = 0x02; // these flags should be kept in the range 0x01-0x08
|
||||||
|
static const int PQISTREAM_SLICE_PROTOCOL_VERSION_ID_01 = 0x10; // Protocol version ID. Should hold on the 4 lower bits.
|
||||||
|
static const int PQISTREAM_PARTIAL_PACKET_HEADER_SIZE = 8; // Same size than normal header, to make the code simpler.
|
||||||
|
static const int PQISTREAM_PACKET_SLICING_PROBE_DELAY = 60; // send every 60 secs.
|
||||||
|
|
||||||
|
// This is a probe packet, that won't deserialise (it's empty) but will not cause problems to old peers either, since they will ignore
|
||||||
|
// it. This packet however will be understood by new peers as a signal to enable packet slicing. This should go when all peers use the
|
||||||
|
// same protocol.
|
||||||
|
|
||||||
|
static uint8_t PACKET_SLICING_PROBE_BYTES[8] = { 0x02, 0xaa, 0xbb, 0xcc, 0x00, 0x00, 0x00, 0x08 } ;
|
||||||
|
|
||||||
/* This removes the print statements (which hammer pqidebug) */
|
/* This removes the print statements (which hammer pqidebug) */
|
||||||
/***
|
/***
|
||||||
#define RSITEM_DEBUG 1
|
#define RSITEM_DEBUG 1
|
||||||
#define DEBUG_TRANSFERS 1
|
#define DEBUG_TRANSFERS 1
|
||||||
#define DEBUG_PQISTREAMER 1
|
#define DEBUG_PQISTREAMER 1
|
||||||
|
#define DEBUG_PACKET_SLICING 1
|
||||||
***/
|
***/
|
||||||
|
|
||||||
#ifdef DEBUG_TRANSFERS
|
#ifdef DEBUG_TRANSFERS
|
||||||
@ -64,6 +79,9 @@ pqistreamer::pqistreamer(RsSerialiser *rss, const RsPeerId& id, BinInterface *bi
|
|||||||
{
|
{
|
||||||
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
|
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
|
||||||
|
|
||||||
|
mAcceptsPacketSlicing = false ; // by default. Will be turned into true when everyone's ready.
|
||||||
|
mLastSentPacketSlicingProbe = 0 ;
|
||||||
|
|
||||||
mAvgLastUpdate = mCurrReadTS = mCurrSentTS = time(NULL);
|
mAvgLastUpdate = mCurrReadTS = mCurrSentTS = time(NULL);
|
||||||
mIncomingSize = 0 ;
|
mIncomingSize = 0 ;
|
||||||
|
|
||||||
@ -95,7 +113,9 @@ pqistreamer::pqistreamer(RsSerialiser *rss, const RsPeerId& id, BinInterface *bi
|
|||||||
pqistreamer::~pqistreamer()
|
pqistreamer::~pqistreamer()
|
||||||
{
|
{
|
||||||
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
|
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
|
||||||
|
#ifdef DEBUG_PQISTREAMER
|
||||||
|
std::cerr << "Closing pqistreamer." << std::endl;
|
||||||
|
#endif
|
||||||
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::~pqistreamer() Destruction!");
|
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::~pqistreamer() Destruction!");
|
||||||
|
|
||||||
if (mBio_flags & BIN_FLAGS_NO_CLOSE)
|
if (mBio_flags & BIN_FLAGS_NO_CLOSE)
|
||||||
@ -185,7 +205,7 @@ void pqistreamer::updateRates()
|
|||||||
{
|
{
|
||||||
int64_t diff = int64_t(t) - int64_t(mAvgLastUpdate) ;
|
int64_t diff = int64_t(t) - int64_t(mAvgLastUpdate) ;
|
||||||
|
|
||||||
float avgReadpSec = getRate(true) * PQISTREAM_AVG_FRAC + (1.0 - PQISTREAM_AVG_FRAC) * mAvgReadCount/(1000.0 * float(diff));
|
float avgReadpSec = getRate(true ) * PQISTREAM_AVG_FRAC + (1.0 - PQISTREAM_AVG_FRAC) * mAvgReadCount/(1000.0 * float(diff));
|
||||||
float avgSentpSec = getRate(false) * PQISTREAM_AVG_FRAC + (1.0 - PQISTREAM_AVG_FRAC) * mAvgSentCount/(1000.0 * float(diff));
|
float avgSentpSec = getRate(false) * PQISTREAM_AVG_FRAC + (1.0 - PQISTREAM_AVG_FRAC) * mAvgSentCount/(1000.0 * float(diff));
|
||||||
|
|
||||||
#ifdef DEBUG_PQISTREAMER
|
#ifdef DEBUG_PQISTREAMER
|
||||||
@ -256,6 +276,7 @@ int pqistreamer::tick_send(uint32_t timeout)
|
|||||||
{
|
{
|
||||||
handleoutgoing_locked();
|
handleoutgoing_locked();
|
||||||
}
|
}
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -278,7 +299,7 @@ int pqistreamer::status()
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void pqistreamer::locked_storeInOutputQueue(void *ptr,int)
|
void pqistreamer::locked_storeInOutputQueue(void *ptr,int,int)
|
||||||
{
|
{
|
||||||
mOutPkts.push_back(ptr);
|
mOutPkts.push_back(ptr);
|
||||||
}
|
}
|
||||||
@ -316,7 +337,7 @@ int pqistreamer::queue_outpqi_locked(RsItem *pqi,uint32_t& pktsize)
|
|||||||
|
|
||||||
if (mRsSerialiser->serialise(pqi, ptr, &pktsize))
|
if (mRsSerialiser->serialise(pqi, ptr, &pktsize))
|
||||||
{
|
{
|
||||||
locked_storeInOutputQueue(ptr,pqi->priority_level()) ;
|
locked_storeInOutputQueue(ptr,pktsize,pqi->priority_level()) ;
|
||||||
|
|
||||||
if (!(mBio_flags & BIN_FLAGS_NO_DELETE))
|
if (!(mBio_flags & BIN_FLAGS_NO_DELETE))
|
||||||
{
|
{
|
||||||
@ -399,6 +420,37 @@ time_t pqistreamer::getLastIncomingTS()
|
|||||||
return mLastIncomingTs;
|
return mLastIncomingTs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Packet slicing:
|
||||||
|
//
|
||||||
|
// Old : 02 0014 03 00000026 [data, 26 bytes] => [version 1B] [service 2B][subpacket 1B] [size 4B]
|
||||||
|
// New1: fv 0014 03 xxxxx sss [data, sss bytes] => [flags 0.5B version 0.5B] [service 2B][subpacket 1B] [packet counter 2.5B size 1.5B]
|
||||||
|
// New2: pp ff xxxxxxxx ssss [data, sss bytes] => [flags 1B] [protocol version 1B] [2^32 packet count] [2^16 size]
|
||||||
|
//
|
||||||
|
// Flags: 0x1 => incomplete packet continued after
|
||||||
|
// Flags: 0x2 => packet ending a previously incomplete packet
|
||||||
|
//
|
||||||
|
// - backward compatibility:
|
||||||
|
// * send one packet with service + subpacket = ffffff. Old peers will silently ignore such packets.
|
||||||
|
// * if received, mark the peer as able to decode the new packet type
|
||||||
|
//
|
||||||
|
// Mode 1:
|
||||||
|
// - Encode length on 1.5 Bytes (10 bits) => max slice size = 1024
|
||||||
|
// - Encode packet ID on 2.5 Bytes (20 bits) => packet counter = [0...1056364]
|
||||||
|
// Mode 2:
|
||||||
|
// - Encode protocol on 1.0 Bytes ( 8 bits)
|
||||||
|
// - Encode flags on 1.0 Bytes ( 8 bits)
|
||||||
|
// - Encode packet ID on 4.0 Bytes (32 bits) => packet counter = [0...2^32]
|
||||||
|
// - Encode size on 2.0 Bytes (16 bits) => 65536 // max slice size = 65536
|
||||||
|
//
|
||||||
|
// - limit packet grouping to max size 1024.
|
||||||
|
// - new peers need to read flux, and properly extract partial sizes, and combine packets based on packet counter.
|
||||||
|
// - on sending, RS should grab slices of max size 1024 from pqiQoS. If smaller, possibly pack them together.
|
||||||
|
// pqiQoS keeps track of sliced packets and makes sure the output is consistent:
|
||||||
|
// * when a large packet needs to be send, only takes a slice and return it, and update the remaining part
|
||||||
|
// * always consider priority when taking new slices => a newly arrived fast packet will always get through.
|
||||||
|
//
|
||||||
|
// Max slice size should be customisable, depending on bandwidth.
|
||||||
|
|
||||||
int pqistreamer::handleoutgoing_locked()
|
int pqistreamer::handleoutgoing_locked()
|
||||||
{
|
{
|
||||||
#ifdef DEBUG_PQISTREAMER
|
#ifdef DEBUG_PQISTREAMER
|
||||||
@ -417,6 +469,10 @@ int pqistreamer::handleoutgoing_locked()
|
|||||||
{
|
{
|
||||||
/* if we are not active - clear anything in the queues. */
|
/* if we are not active - clear anything in the queues. */
|
||||||
locked_clear_out_queue() ;
|
locked_clear_out_queue() ;
|
||||||
|
#ifdef DEBUG_PACKET_SLICING
|
||||||
|
std::cerr << "(II) Switching off packet slicing." << std::endl;
|
||||||
|
#endif
|
||||||
|
mAcceptsPacketSlicing = false ;
|
||||||
|
|
||||||
/* also remove the pending packets */
|
/* also remove the pending packets */
|
||||||
if (mPkt_wpending)
|
if (mPkt_wpending)
|
||||||
@ -440,57 +496,111 @@ int pqistreamer::handleoutgoing_locked()
|
|||||||
if ((!(mBio->cansend(0))) || (maxbytes < sentbytes))
|
if ((!(mBio->cansend(0))) || (maxbytes < sentbytes))
|
||||||
{
|
{
|
||||||
|
|
||||||
#ifdef DEBUG_TRANSFERS
|
#ifdef DEBUG_PACKET_SLICING
|
||||||
if (maxbytes < sentbytes)
|
if (maxbytes < sentbytes)
|
||||||
{
|
std::cerr << "pqistreamer::handleoutgoing_locked() Stopped sending: bio not ready. maxbytes=" << maxbytes << ", sentbytes=" << sentbytes << std::endl;
|
||||||
std::cerr << "pqistreamer::handleoutgoing_locked() Stopped sending sentbytes > maxbytes. Sent " << sentbytes << " bytes ";
|
|
||||||
std::cerr << std::endl;
|
|
||||||
}
|
|
||||||
else
|
else
|
||||||
{
|
std::cerr << "pqistreamer::handleoutgoing_locked() Stopped sending: sentbytes=" << sentbytes << ", max=" << maxbytes << std::endl;
|
||||||
std::cerr << "pqistreamer::handleoutgoing_locked() Stopped sending at cansend() is false";
|
|
||||||
std::cerr << std::endl;
|
|
||||||
}
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
#define GROUP_OUTGOING_PACKETS 1
|
// send a out_pkt., else send out_data. unless there is a pending packet. The strategy is to
|
||||||
#define PACKET_GROUPING_SIZE_LIMIT 512
|
// - grab as many packets as possible while below the optimal packet size, so as to allow some packing and decrease encryption padding overhead (suposeddly)
|
||||||
// send a out_pkt., else send out_data. unless
|
// - limit packets size to OPTIMAL_PACKET_SIZE when sending big packets so as to keep as much QoS as possible.
|
||||||
// there is a pending packet.
|
|
||||||
if (!mPkt_wpending)
|
if (!mPkt_wpending)
|
||||||
#ifdef GROUP_OUTGOING_PACKETS
|
|
||||||
{
|
{
|
||||||
void *dta;
|
void *dta;
|
||||||
mPkt_wpending_size = 0 ;
|
mPkt_wpending_size = 0 ;
|
||||||
int k=0;
|
int k=0;
|
||||||
|
|
||||||
while(mPkt_wpending_size < (uint32_t)maxbytes && mPkt_wpending_size < PACKET_GROUPING_SIZE_LIMIT && (dta = locked_pop_out_data())!=NULL )
|
// Checks for inserting a packet slicing probe. We do that to send the other peer the information that packet slicing can be used.
|
||||||
|
// if so, we enable it for the session. This should be removed (because it's unnecessary) when all users have switched to the new version.
|
||||||
|
time_t now = time(NULL) ;
|
||||||
|
|
||||||
|
if((!mAcceptsPacketSlicing) && now > mLastSentPacketSlicingProbe + PQISTREAM_PACKET_SLICING_PROBE_DELAY)
|
||||||
{
|
{
|
||||||
uint32_t s = getRsItemSize(dta);
|
#ifdef DEBUG_PACKET_SLICING
|
||||||
mPkt_wpending = realloc(mPkt_wpending,s+mPkt_wpending_size) ;
|
std::cerr << "(II) Inserting packet slicing probe in traffic" << std::endl;
|
||||||
memcpy( &((char*)mPkt_wpending)[mPkt_wpending_size],dta,s) ;
|
#endif
|
||||||
|
|
||||||
|
mPkt_wpending_size = 8 ;
|
||||||
|
mPkt_wpending = rs_malloc(8) ;
|
||||||
|
memcpy(mPkt_wpending,PACKET_SLICING_PROBE_BYTES,8) ;
|
||||||
|
|
||||||
|
mLastSentPacketSlicingProbe = now ;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint32_t slice_size=0;
|
||||||
|
bool slice_starts=true ;
|
||||||
|
bool slice_ends=true ;
|
||||||
|
uint32_t slice_packet_id=0 ;
|
||||||
|
|
||||||
|
do
|
||||||
|
{
|
||||||
|
int desired_packet_size = mAcceptsPacketSlicing?PQISTREAM_OPTIMAL_PACKET_SIZE:(getRsPktMaxSize());
|
||||||
|
|
||||||
|
dta = locked_pop_out_data(desired_packet_size,slice_size,slice_starts,slice_ends,slice_packet_id) ;
|
||||||
|
|
||||||
|
if(!dta)
|
||||||
|
break ;
|
||||||
|
|
||||||
|
if(slice_size > 0xffff)
|
||||||
|
{
|
||||||
|
std::cerr << "(EE) protocol error in pqitreamer: slice size is too large and cannot be encoded." ;
|
||||||
|
free(mPkt_wpending) ;
|
||||||
|
mPkt_wpending_size = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(slice_starts && slice_ends) // good old method. Send the packet as is, since it's a full packet.
|
||||||
|
{
|
||||||
|
#ifdef DEBUG_PACKET_SLICING
|
||||||
|
std::cerr << "sending full slice, old style. Size=" << slice_size << std::endl;
|
||||||
|
#endif
|
||||||
|
mPkt_wpending = realloc(mPkt_wpending,slice_size+mPkt_wpending_size) ;
|
||||||
|
memcpy( &((char*)mPkt_wpending)[mPkt_wpending_size],dta,slice_size) ;
|
||||||
free(dta);
|
free(dta);
|
||||||
mPkt_wpending_size += s ;
|
mPkt_wpending_size += slice_size ;
|
||||||
++k ;
|
++k ;
|
||||||
}
|
}
|
||||||
|
else // partial packet. We make a special header for it and insert it in the stream
|
||||||
|
{
|
||||||
|
#ifdef DEBUG_PACKET_SLICING
|
||||||
|
std::cerr << "sending partial slice, packet ID=" << std::hex << slice_packet_id << std::dec << ", size=" << slice_size << std::endl;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
mPkt_wpending = realloc(mPkt_wpending,slice_size+mPkt_wpending_size+PQISTREAM_PARTIAL_PACKET_HEADER_SIZE) ;
|
||||||
|
memcpy( &((char*)mPkt_wpending)[mPkt_wpending_size+PQISTREAM_PARTIAL_PACKET_HEADER_SIZE],dta,slice_size) ;
|
||||||
|
free(dta);
|
||||||
|
|
||||||
|
// New2: pp ff xxxxxxxx ssss [data, sss bytes] => [flags 1B] [protocol version 1B] [2^32 packet count] [2^16 size]
|
||||||
|
|
||||||
|
uint8_t partial_flags = 0 ;
|
||||||
|
if(slice_starts) partial_flags |= PQISTREAM_SLICE_FLAG_STARTS ;
|
||||||
|
if(slice_ends ) partial_flags |= PQISTREAM_SLICE_FLAG_ENDS ;
|
||||||
|
|
||||||
|
((char*)mPkt_wpending)[mPkt_wpending_size+0x00] = PQISTREAM_SLICE_PROTOCOL_VERSION_ID_01 ;
|
||||||
|
((char*)mPkt_wpending)[mPkt_wpending_size+0x01] = partial_flags ;
|
||||||
|
((char*)mPkt_wpending)[mPkt_wpending_size+0x02] = uint8_t(slice_packet_id >> 24) & 0xff ;
|
||||||
|
((char*)mPkt_wpending)[mPkt_wpending_size+0x03] = uint8_t(slice_packet_id >> 16) & 0xff ;
|
||||||
|
((char*)mPkt_wpending)[mPkt_wpending_size+0x04] = uint8_t(slice_packet_id >> 8) & 0xff ;
|
||||||
|
((char*)mPkt_wpending)[mPkt_wpending_size+0x05] = uint8_t(slice_packet_id >> 0) & 0xff ;
|
||||||
|
((char*)mPkt_wpending)[mPkt_wpending_size+0x06] = uint8_t(slice_size >> 8) & 0xff ;
|
||||||
|
((char*)mPkt_wpending)[mPkt_wpending_size+0x07] = uint8_t(slice_size >> 0) & 0xff ;
|
||||||
|
|
||||||
|
mPkt_wpending_size += slice_size + PQISTREAM_PARTIAL_PACKET_HEADER_SIZE;
|
||||||
|
++k ;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
while(mPkt_wpending_size < (uint32_t)maxbytes && mPkt_wpending_size < PQISTREAM_OPTIMAL_PACKET_SIZE ) ;
|
||||||
|
|
||||||
#ifdef DEBUG_PQISTREAMER
|
#ifdef DEBUG_PQISTREAMER
|
||||||
if(k > 1)
|
if(k > 1)
|
||||||
std::cerr << "Packed " << k << " packets into " << mPkt_wpending_size << " bytes." << std::endl;
|
std::cerr << "Packed " << k << " packets into " << mPkt_wpending_size << " bytes." << std::endl;
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
#else
|
|
||||||
{
|
|
||||||
void *dta = locked_pop_out_data() ;
|
|
||||||
|
|
||||||
if(dta != NULL)
|
|
||||||
{
|
|
||||||
mPkt_wpending = dta ;
|
|
||||||
mPkt_wpending_size = getRsItemSize(dta);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
if (mPkt_wpending)
|
if (mPkt_wpending)
|
||||||
{
|
{
|
||||||
// write packet.
|
// write packet.
|
||||||
@ -507,12 +617,19 @@ int pqistreamer::handleoutgoing_locked()
|
|||||||
// std::cerr << out << std::endl ;
|
// std::cerr << out << std::endl ;
|
||||||
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out);
|
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out);
|
||||||
#endif
|
#endif
|
||||||
|
std::cerr << PeerId() << ": sending failed. Only " << ss << " bytes sent over " << mPkt_wpending_size << std::endl;
|
||||||
|
|
||||||
// pkt_wpending will kept til next time.
|
// pkt_wpending will kept til next time.
|
||||||
// ensuring exactly the same data is written (openSSL requirement).
|
// ensuring exactly the same data is written (openSSL requirement).
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
#ifdef DEBUG_PQISTREAMER
|
||||||
|
else
|
||||||
|
std::cerr << PeerId() << ": sent " << ss << " bytes " << std::endl;
|
||||||
|
#endif
|
||||||
|
|
||||||
++nsent;
|
++nsent;
|
||||||
|
|
||||||
outSentBytes_locked(mPkt_wpending_size); // this is the only time where we know exactly what was sent.
|
outSentBytes_locked(mPkt_wpending_size); // this is the only time where we know exactly what was sent.
|
||||||
|
|
||||||
#ifdef DEBUG_TRANSFERS
|
#ifdef DEBUG_TRANSFERS
|
||||||
@ -562,7 +679,7 @@ int pqistreamer::handleincoming_locked()
|
|||||||
void *block = mPkt_rpending;
|
void *block = mPkt_rpending;
|
||||||
|
|
||||||
// initial read size: basic packet.
|
// initial read size: basic packet.
|
||||||
int blen = getRsPktBaseSize();
|
int blen = getRsPktBaseSize(); // this is valid for both packet slices and normal un-sliced packets (same header size)
|
||||||
|
|
||||||
int maxin = inAllowedBytes_locked();
|
int maxin = inAllowedBytes_locked();
|
||||||
|
|
||||||
@ -623,33 +740,56 @@ start_packet_read:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
#ifdef DEBUG_PQISTREAMER
|
#ifdef DEBUG_PQISTREAMER
|
||||||
std::cerr << "[" << (void*)pthread_self() << "] " << "block 0 : " << (int)(((unsigned char*)block)[0]) << " " << (int)(((unsigned char*)block)[1]) << " " << (int)(((unsigned char*)block)[2]) << " "
|
std::cerr << "[" << (void*)pthread_self() << "] " << "block 0 : " << RsUtil::BinToHex(block,8) << std::endl;
|
||||||
<< (int)(((unsigned char*)block)[3]) << " "
|
|
||||||
<< (int)(((unsigned char*)block)[4]) << " "
|
|
||||||
<< (int)(((unsigned char*)block)[5]) << " "
|
|
||||||
<< (int)(((unsigned char*)block)[6]) << " "
|
|
||||||
<< (int)(((unsigned char*)block)[7]) << " " << std::endl ;
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
readbytes += blen;
|
readbytes += blen;
|
||||||
mReading_state = reading_state_packet_started ;
|
mReading_state = reading_state_packet_started ;
|
||||||
mFailed_read_attempts = 0 ; // reset failed read, as the packet has been totally read.
|
mFailed_read_attempts = 0 ; // reset failed read, as the packet has been totally read.
|
||||||
|
|
||||||
|
// Check for packet slicing probe (04/26/2016). To be removed when everyone uses it.
|
||||||
|
|
||||||
|
if(!memcmp(block,PACKET_SLICING_PROBE_BYTES,8))
|
||||||
|
{
|
||||||
|
mAcceptsPacketSlicing = true ;
|
||||||
|
#ifdef DEBUG_PACKET_SLICING
|
||||||
|
std::cerr << "(II) Enabling packet slicing!" << std::endl;
|
||||||
|
#endif
|
||||||
|
}
|
||||||
}
|
}
|
||||||
continue_packet:
|
continue_packet:
|
||||||
{
|
{
|
||||||
// workout how much more to read.
|
// workout how much more to read.
|
||||||
int extralen = getRsItemSize(block) - blen;
|
|
||||||
|
bool is_partial_packet = false ;
|
||||||
|
bool is_packet_starting = (((char*)block)[1] == PQISTREAM_SLICE_FLAG_STARTS) ; // STARTS and ENDS flags are actually never combined.
|
||||||
|
bool is_packet_ending = (((char*)block)[1] == PQISTREAM_SLICE_FLAG_ENDS) ;
|
||||||
|
bool is_packet_middle = (((char*)block)[1] == 0x00) ;
|
||||||
|
|
||||||
|
uint32_t extralen =0;
|
||||||
|
uint32_t slice_packet_id =0;
|
||||||
|
|
||||||
|
if( ((char*)block)[0] == PQISTREAM_SLICE_PROTOCOL_VERSION_ID_01 && ( is_packet_starting || is_packet_middle || is_packet_ending))
|
||||||
|
{
|
||||||
|
extralen = (uint32_t(((uint8_t*)block)[6]) << 8 ) + (uint32_t(((uint8_t*)block)[7]));
|
||||||
|
slice_packet_id = (uint32_t(((uint8_t*)block)[2]) << 24) + (uint32_t(((uint8_t*)block)[3]) << 16) + (uint32_t(((uint8_t*)block)[4]) << 8) + (uint32_t(((uint8_t*)block)[5]) << 0);
|
||||||
|
|
||||||
|
#ifdef DEBUG_PACKET_SLICING
|
||||||
|
std::cerr << "Reading partial packet from mem block " << RsUtil::BinToHex((char*)block,8) << ": packet_id=" << std::hex << slice_packet_id << std::dec << ", len=" << extralen << std::endl;
|
||||||
|
#endif
|
||||||
|
is_partial_packet = true ;
|
||||||
|
|
||||||
|
mAcceptsPacketSlicing = true ; // this is needed
|
||||||
|
}
|
||||||
|
else
|
||||||
|
extralen = getRsItemSize(block) - blen; // old style packet type
|
||||||
|
|
||||||
#ifdef DEBUG_PQISTREAMER
|
#ifdef DEBUG_PQISTREAMER
|
||||||
std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet getRsItemSize(block) = " << getRsItemSize(block) << std::endl ;
|
std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet getRsItemSize(block) = " << getRsItemSize(block) << std::endl ;
|
||||||
std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet extralen = " << extralen << std::endl ;
|
std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet extralen = " << extralen << std::endl ;
|
||||||
|
|
||||||
std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet state=" << mReading_state << std::endl ;
|
std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet state=" << mReading_state << std::endl ;
|
||||||
std::cerr << "[" << (void*)pthread_self() << "] " << "block 1 : " << (int)(((unsigned char*)block)[0]) << " " << (int)(((unsigned char*)block)[1]) << " " << (int)(((unsigned char*)block)[2]) << " " << (int)(((unsigned char*)block)[3]) << " "
|
std::cerr << "[" << (void*)pthread_self() << "] " << "block 1 : " << RsUtil::BinToHex(block,8) << std::endl;
|
||||||
<< (int)(((unsigned char*)block)[4]) << " "
|
|
||||||
<< (int)(((unsigned char*)block)[5]) << " "
|
|
||||||
<< (int)(((unsigned char*)block)[6]) << " "
|
|
||||||
<< (int)(((unsigned char*)block)[7]) << " " << std::endl ;
|
|
||||||
#endif
|
#endif
|
||||||
if (extralen > maxlen - blen)
|
if (extralen > maxlen - blen)
|
||||||
{
|
{
|
||||||
@ -670,15 +810,9 @@ continue_packet:
|
|||||||
msg += "\n";
|
msg += "\n";
|
||||||
rs_sprintf_append(msg, "(M:%d B:%d E:%d)\n", maxlen, blen, extralen);
|
rs_sprintf_append(msg, "(M:%d B:%d E:%d)\n", maxlen, blen, extralen);
|
||||||
msg += "\n";
|
msg += "\n";
|
||||||
rs_sprintf_append(msg, "block = %d %d %d %d %d %d %d %d\n",
|
msg += "block = " ;
|
||||||
(int)(((unsigned char*)block)[0]),
|
msg += RsUtil::BinToHex((char*)block,8);
|
||||||
(int)(((unsigned char*)block)[1]),
|
|
||||||
(int)(((unsigned char*)block)[2]),
|
|
||||||
(int)(((unsigned char*)block)[3]),
|
|
||||||
(int)(((unsigned char*)block)[4]),
|
|
||||||
(int)(((unsigned char*)block)[5]),
|
|
||||||
(int)(((unsigned char*)block)[6]),
|
|
||||||
(int)(((unsigned char*)block)[7])) ;
|
|
||||||
msg += "\n";
|
msg += "\n";
|
||||||
msg += "Please get your friends to upgrade to the latest version";
|
msg += "Please get your friends to upgrade to the latest version";
|
||||||
msg += "\n";
|
msg += "\n";
|
||||||
@ -745,17 +879,8 @@ continue_packet:
|
|||||||
msgout += "If it happens manny time, please contact the developers, and send them these numbers:";
|
msgout += "If it happens manny time, please contact the developers, and send them these numbers:";
|
||||||
msgout += "\n";
|
msgout += "\n";
|
||||||
|
|
||||||
rs_sprintf_append(msgout, "block = %d %d %d %d %d %d %d %d\n",
|
msgout += "block = " ;
|
||||||
(int)(((unsigned char*)block)[0]),
|
msgout += RsUtil::BinToHex((char*)block,8) + "\n" ;
|
||||||
(int)(((unsigned char*)block)[1]),
|
|
||||||
(int)(((unsigned char*)block)[2]),
|
|
||||||
(int)(((unsigned char*)block)[3]),
|
|
||||||
(int)(((unsigned char*)block)[4]),
|
|
||||||
(int)(((unsigned char*)block)[5]),
|
|
||||||
(int)(((unsigned char*)block)[6]),
|
|
||||||
(int)(((unsigned char*)block)[7]));
|
|
||||||
|
|
||||||
//notify->AddSysMessage(0, RS_SYS_WARNING, title, msgout.str());
|
|
||||||
|
|
||||||
std::cerr << msgout << std::endl;
|
std::cerr << msgout << std::endl;
|
||||||
}
|
}
|
||||||
@ -776,11 +901,7 @@ continue_packet:
|
|||||||
}
|
}
|
||||||
#ifdef DEBUG_PQISTREAMER
|
#ifdef DEBUG_PQISTREAMER
|
||||||
std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet state=" << mReading_state << std::endl ;
|
std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet state=" << mReading_state << std::endl ;
|
||||||
std::cerr << "[" << (void*)pthread_self() << "] " << "block 2 : " << (int)(((unsigned char*)extradata)[0]) << " " << (int)(((unsigned char*)extradata)[1]) << " " << (int)(((unsigned char*)extradata)[2]) << " " << (int)(((unsigned char*)extradata)[3]) << " "
|
std::cerr << "[" << (void*)pthread_self() << "] " << "block 2 : " << RsUtil::BinToHex(extradata,8) << std::endl;
|
||||||
<< (int)(((unsigned char*)extradata)[4]) << " "
|
|
||||||
<< (int)(((unsigned char*)extradata)[5]) << " "
|
|
||||||
<< (int)(((unsigned char*)extradata)[6]) << " "
|
|
||||||
<< (int)(((unsigned char*)extradata)[7]) << " " << std::endl ;
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
mFailed_read_attempts = 0 ;
|
mFailed_read_attempts = 0 ;
|
||||||
@ -797,14 +918,21 @@ continue_packet:
|
|||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
// std::cerr << "Deserializing packet of size " << pktlen <<std::endl ;
|
|
||||||
|
|
||||||
uint32_t pktlen = blen+extralen ;
|
uint32_t pktlen = blen+extralen ;
|
||||||
#ifdef DEBUG_PQISTREAMER
|
#ifdef DEBUG_PQISTREAMER
|
||||||
std::cerr << "[" << (void*)pthread_self() << "] " << "deserializing. Size=" << pktlen << std::endl ;
|
std::cerr << "[" << (void*)pthread_self() << "] " << RsUtil::BinToHex((char*)block,8) << "...: deserializing. Size=" << pktlen << std::endl ;
|
||||||
#endif
|
#endif
|
||||||
|
RsItem *pkt ;
|
||||||
|
|
||||||
RsItem *pkt = mRsSerialiser->deserialise(block, &pktlen);
|
if(is_partial_packet)
|
||||||
|
{
|
||||||
|
#ifdef DEBUG_PACKET_SLICING
|
||||||
|
std::cerr << "Inputing partial packet " << RsUtil::BinToHex((char*)block,8) << std::endl;
|
||||||
|
#endif
|
||||||
|
pkt = addPartialPacket(block,pktlen,slice_packet_id,is_packet_starting,is_packet_ending) ;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
pkt = mRsSerialiser->deserialise(block, &pktlen);
|
||||||
|
|
||||||
if ((pkt != NULL) && (0 < handleincomingitem_locked(pkt,pktlen)))
|
if ((pkt != NULL) && (0 < handleincomingitem_locked(pkt,pktlen)))
|
||||||
{
|
{
|
||||||
@ -813,7 +941,7 @@ continue_packet:
|
|||||||
#endif
|
#endif
|
||||||
inReadBytes_locked(pktlen); // only count deserialised packets, because that's what is actually been transfered.
|
inReadBytes_locked(pktlen); // only count deserialised packets, because that's what is actually been transfered.
|
||||||
}
|
}
|
||||||
else
|
else if (!is_partial_packet)
|
||||||
{
|
{
|
||||||
#ifdef DEBUG_PQISTREAMER
|
#ifdef DEBUG_PQISTREAMER
|
||||||
pqioutput(PQL_ALERT, pqistreamerzone, "Failed to handle Packet!");
|
pqioutput(PQL_ALERT, pqistreamerzone, "Failed to handle Packet!");
|
||||||
@ -844,6 +972,91 @@ continue_packet:
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
RsItem *pqistreamer::addPartialPacket(const void *block,uint32_t len,uint32_t slice_packet_id,bool is_packet_starting,bool is_packet_ending)
|
||||||
|
{
|
||||||
|
#ifdef DEBUG_PACKET_SLICING
|
||||||
|
std::cerr << "Receiving partial packet. size=" << len << ", ID=" << std::hex << slice_packet_id << std::dec << ", starting:" << is_packet_starting << ", ending:" << is_packet_ending ;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
if(is_packet_starting && is_packet_ending)
|
||||||
|
{
|
||||||
|
std::cerr << " (EE) unexpected situation: both starting and ending" << std::endl;
|
||||||
|
return NULL ;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint32_t slice_length = len - PQISTREAM_PARTIAL_PACKET_HEADER_SIZE ;
|
||||||
|
unsigned char *slice_data = &((unsigned char*)block)[PQISTREAM_PARTIAL_PACKET_HEADER_SIZE] ;
|
||||||
|
|
||||||
|
std::map<uint32_t,PartialPacketRecord>::iterator it = mPartialPackets.find(slice_packet_id) ;
|
||||||
|
|
||||||
|
if(it == mPartialPackets.end())
|
||||||
|
{
|
||||||
|
// make sure we really have a starting packet. Otherwise this is an error.
|
||||||
|
|
||||||
|
if(!is_packet_starting)
|
||||||
|
{
|
||||||
|
std::cerr << " (EE) non starting packet has no record. Dropping" << std::endl;
|
||||||
|
return NULL ;
|
||||||
|
}
|
||||||
|
PartialPacketRecord& rec = mPartialPackets[slice_packet_id] ;
|
||||||
|
|
||||||
|
rec.mem = rs_malloc(slice_length) ;
|
||||||
|
|
||||||
|
if(!rec.mem)
|
||||||
|
{
|
||||||
|
std::cerr << " (EE) Cannot allocate memory for slice of size " << slice_length << std::endl;
|
||||||
|
return NULL ;
|
||||||
|
}
|
||||||
|
|
||||||
|
memcpy(rec.mem, slice_data, slice_length) ; ;
|
||||||
|
rec.size = slice_length ;
|
||||||
|
|
||||||
|
#ifdef DEBUG_PACKET_SLICING
|
||||||
|
std::cerr << " => stored in new record (size=" << rec.size << std::endl;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
return NULL ; // no need to check for ending
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
PartialPacketRecord& rec = it->second ;
|
||||||
|
|
||||||
|
if(is_packet_starting)
|
||||||
|
{
|
||||||
|
std::cerr << "(WW) dropping unfinished existing packet that gets to be replaced by new starting packet." << std::endl;
|
||||||
|
free(rec.mem);
|
||||||
|
rec.size = 0 ;
|
||||||
|
}
|
||||||
|
// make sure this is a continuing packet, otherwise this is an error.
|
||||||
|
|
||||||
|
rec.mem = realloc(rec.mem, rec.size + slice_length) ;
|
||||||
|
memcpy( &((char*)rec.mem)[rec.size],slice_data,slice_length) ;
|
||||||
|
rec.size += slice_length ;
|
||||||
|
|
||||||
|
#ifdef DEBUG_PACKET_SLICING
|
||||||
|
std::cerr << " => added to existing record size=" << rec.size ;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
if(is_packet_ending)
|
||||||
|
{
|
||||||
|
#ifdef DEBUG_PACKET_SLICING
|
||||||
|
std::cerr << " => deserialising: mem=" << RsUtil::BinToHex((char*)rec.mem,std::min(8u,rec.size)) << std::endl;
|
||||||
|
#endif
|
||||||
|
RsItem *item = mRsSerialiser->deserialise(rec.mem, &rec.size);
|
||||||
|
|
||||||
|
free(rec.mem) ;
|
||||||
|
mPartialPackets.erase(it) ;
|
||||||
|
return item ;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
#ifdef DEBUG_PACKET_SLICING
|
||||||
|
std::cerr << std::endl;
|
||||||
|
#endif
|
||||||
|
return NULL ;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* BandWidth Management Assistance */
|
/* BandWidth Management Assistance */
|
||||||
|
|
||||||
@ -1074,8 +1287,13 @@ int pqistreamer::locked_gatherStatistics(std::list<RSTrafficClue>& out_lst,std::
|
|||||||
return 1 ;
|
return 1 ;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *pqistreamer::locked_pop_out_data()
|
void *pqistreamer::locked_pop_out_data(uint32_t max_slice_size,uint32_t& size,bool& starts,bool& ends,uint32_t& packet_id)
|
||||||
{
|
{
|
||||||
|
size = 0 ;
|
||||||
|
starts = true ;
|
||||||
|
ends = true ;
|
||||||
|
packet_id = 0 ;
|
||||||
|
|
||||||
void *res = NULL ;
|
void *res = NULL ;
|
||||||
|
|
||||||
if (!mOutPkts.empty())
|
if (!mOutPkts.empty())
|
||||||
@ -1089,3 +1307,5 @@ void *pqistreamer::locked_pop_out_data()
|
|||||||
}
|
}
|
||||||
return res ;
|
return res ;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -40,6 +40,12 @@
|
|||||||
// The interface does not handle connection, just communication.
|
// The interface does not handle connection, just communication.
|
||||||
// possible bioflags: BIN_FLAGS_NO_CLOSE | BIN_FLAGS_NO_DELETE
|
// possible bioflags: BIN_FLAGS_NO_CLOSE | BIN_FLAGS_NO_DELETE
|
||||||
|
|
||||||
|
struct PartialPacketRecord
|
||||||
|
{
|
||||||
|
void *mem ;
|
||||||
|
uint32_t size ;
|
||||||
|
};
|
||||||
|
|
||||||
class pqistreamer: public PQInterface
|
class pqistreamer: public PQInterface
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
@ -71,12 +77,11 @@ class pqistreamer: public PQInterface
|
|||||||
|
|
||||||
// These methods are redefined in pqiQoSstreamer
|
// These methods are redefined in pqiQoSstreamer
|
||||||
//
|
//
|
||||||
virtual void locked_storeInOutputQueue(void *ptr,int priority) ;
|
virtual void locked_storeInOutputQueue(void *ptr, int size, int priority) ;
|
||||||
virtual int locked_out_queue_size() const ;
|
virtual int locked_out_queue_size() const ;
|
||||||
virtual void locked_clear_out_queue() ;
|
virtual void locked_clear_out_queue() ;
|
||||||
virtual int locked_compute_out_pkt_size() const ;
|
virtual int locked_compute_out_pkt_size() const ;
|
||||||
virtual void *locked_pop_out_data() ;
|
virtual void *locked_pop_out_data(uint32_t max_slice_size,uint32_t& size,bool& starts,bool& ends,uint32_t& packet_id);
|
||||||
//virtual int locked_gatherStatistics(std::vector<uint32_t>& per_service_count,std::vector<uint32_t>& per_priority_count) const; // extracting data.
|
|
||||||
virtual int locked_gatherStatistics(std::list<RSTrafficClue>& outqueue_stats,std::list<RSTrafficClue>& inqueue_stats); // extracting data.
|
virtual int locked_gatherStatistics(std::list<RSTrafficClue>& outqueue_stats,std::list<RSTrafficClue>& inqueue_stats); // extracting data.
|
||||||
|
|
||||||
void updateRates() ;
|
void updateRates() ;
|
||||||
@ -156,7 +161,12 @@ class pqistreamer: public PQInterface
|
|||||||
std::list<RSTrafficClue> mCurrentStatsChunk_Out ;
|
std::list<RSTrafficClue> mCurrentStatsChunk_Out ;
|
||||||
time_t mStatisticsTimeStamp ;
|
time_t mStatisticsTimeStamp ;
|
||||||
|
|
||||||
|
bool mAcceptsPacketSlicing ;
|
||||||
|
time_t mLastSentPacketSlicingProbe ;
|
||||||
void locked_addTrafficClue(const RsItem *pqi, uint32_t pktsize, std::list<RSTrafficClue> &lst);
|
void locked_addTrafficClue(const RsItem *pqi, uint32_t pktsize, std::list<RSTrafficClue> &lst);
|
||||||
|
RsItem *addPartialPacket(const void *block, uint32_t len, uint32_t slice_packet_id,bool packet_starting,bool packet_ending);
|
||||||
|
|
||||||
|
std::map<uint32_t,PartialPacketRecord> mPartialPackets ;
|
||||||
};
|
};
|
||||||
|
|
||||||
#endif //MRK_PQI_STREAMER_HEADER
|
#endif //MRK_PQI_STREAMER_HEADER
|
||||||
|
@ -69,8 +69,12 @@ static double getCurrentTS()
|
|||||||
return cts;
|
return cts;
|
||||||
}
|
}
|
||||||
|
|
||||||
const double RsServer::minTimeDelta = 0.1; // 25;
|
// These values should be tunable from the GUI, to offer a compromise between speed and CPU use.
|
||||||
const double RsServer::maxTimeDelta = 0.5;
|
// In some cases (VOIP) it's likely that we will need to set them temporarily to a very low
|
||||||
|
// value, in order to favor a fast feedback
|
||||||
|
|
||||||
|
const double RsServer::minTimeDelta = 0.05; // 25;
|
||||||
|
const double RsServer::maxTimeDelta = 0.2;
|
||||||
const double RsServer::kickLimit = 0.15;
|
const double RsServer::kickLimit = 0.15;
|
||||||
|
|
||||||
|
|
||||||
|
@ -96,6 +96,9 @@ const uint16_t RS_SERVICE_TYPE_PLUGIN_ARADO_ID = 0x2001;
|
|||||||
const uint16_t RS_SERVICE_TYPE_PLUGIN_QCHESS_ID = 0x2002;
|
const uint16_t RS_SERVICE_TYPE_PLUGIN_QCHESS_ID = 0x2002;
|
||||||
const uint16_t RS_SERVICE_TYPE_PLUGIN_FEEDREADER = 0x2003;
|
const uint16_t RS_SERVICE_TYPE_PLUGIN_FEEDREADER = 0x2003;
|
||||||
|
|
||||||
|
// Reserved for packet slicing probes.
|
||||||
|
const uint16_t RS_SERVICE_TYPE_PACKET_SLICING_PROBE = 0xAABB;
|
||||||
|
|
||||||
// Nabu's services.
|
// Nabu's services.
|
||||||
const uint16_t RS_SERVICE_TYPE_PLUGIN_FIDO_GW = 0xF1D0;
|
const uint16_t RS_SERVICE_TYPE_PLUGIN_FIDO_GW = 0xF1D0;
|
||||||
const uint16_t RS_SERVICE_TYPE_PLUGIN_ZERORESERVE = 0xBEEF;
|
const uint16_t RS_SERVICE_TYPE_PLUGIN_ZERORESERVE = 0xBEEF;
|
||||||
|
@ -23,6 +23,8 @@
|
|||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include <iomanip>
|
||||||
|
|
||||||
#include "util/rsdir.h"
|
#include "util/rsdir.h"
|
||||||
#include "retroshare/rsiface.h"
|
#include "retroshare/rsiface.h"
|
||||||
#include "pqi/pqibin.h"
|
#include "pqi/pqibin.h"
|
||||||
@ -38,7 +40,6 @@
|
|||||||
* #define DEBUG_RTT 1
|
* #define DEBUG_RTT 1
|
||||||
****/
|
****/
|
||||||
|
|
||||||
|
|
||||||
/* DEFINE INTERFACE POINTER! */
|
/* DEFINE INTERFACE POINTER! */
|
||||||
RsRtt *rsRtt = NULL;
|
RsRtt *rsRtt = NULL;
|
||||||
|
|
||||||
@ -168,7 +169,7 @@ int p3rtt::sendPackets()
|
|||||||
pt = mSentPingTime;
|
pt = mSentPingTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (now - pt > RTT_PING_PERIOD)
|
if (now >= pt+RTT_PING_PERIOD)
|
||||||
{
|
{
|
||||||
sendPingMeasurements();
|
sendPingMeasurements();
|
||||||
|
|
||||||
@ -190,19 +191,10 @@ void p3rtt::sendPingMeasurements()
|
|||||||
|
|
||||||
mServiceCtrl->getPeersConnected(getServiceInfo().mServiceType, idList);
|
mServiceCtrl->getPeersConnected(getServiceInfo().mServiceType, idList);
|
||||||
|
|
||||||
#ifdef DEBUG_RTT
|
|
||||||
std::cerr << "p3rtt::sendPingMeasurements() @ts: " << ts;
|
|
||||||
std::cerr << std::endl;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
/* prepare packets */
|
/* prepare packets */
|
||||||
std::set<RsPeerId>::iterator it;
|
std::set<RsPeerId>::iterator it;
|
||||||
for(it = idList.begin(); it != idList.end(); ++it)
|
for(it = idList.begin(); it != idList.end(); ++it)
|
||||||
{
|
{
|
||||||
#ifdef DEBUG_RTT
|
|
||||||
std::cerr << "p3rtt::sendPingMeasurements() Pinging: " << *it;
|
|
||||||
std::cerr << std::endl;
|
|
||||||
#endif
|
|
||||||
double ts = getCurrentTS();
|
double ts = getCurrentTS();
|
||||||
|
|
||||||
/* create the packet */
|
/* create the packet */
|
||||||
@ -214,11 +206,8 @@ void p3rtt::sendPingMeasurements()
|
|||||||
storePingAttempt(*it, ts, mCounter);
|
storePingAttempt(*it, ts, mCounter);
|
||||||
|
|
||||||
#ifdef DEBUG_RTT
|
#ifdef DEBUG_RTT
|
||||||
std::cerr << "p3rtt::sendPingMeasurements() With Packet:";
|
std::cerr << "p3rtt::sendPingMeasurements() Pinging: " << *it << " [" << pingPkt->mSeqNo << "," << std::hex << pingPkt->mPingTS << std::dec << "]" << std::endl;;
|
||||||
std::cerr << std::endl;
|
|
||||||
pingPkt->print(std::cerr, 10);
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
sendItem(pingPkt);
|
sendItem(pingPkt);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -256,30 +245,28 @@ int p3rtt::handlePing(RsItem *item)
|
|||||||
/* cast to right type */
|
/* cast to right type */
|
||||||
RsRttPingItem *ping = (RsRttPingItem *) item;
|
RsRttPingItem *ping = (RsRttPingItem *) item;
|
||||||
|
|
||||||
|
double ts = getCurrentTS();
|
||||||
#ifdef DEBUG_RTT
|
#ifdef DEBUG_RTT
|
||||||
std::cerr << "p3rtt::handlePing() Recvd Packet from: " << ping->PeerId();
|
std::cerr << "p3rtt::handlePing() from: " << ping->PeerId() << " - [" << ping->mSeqNo << "," << std::hex << ping->mPingTS << std::dec << "] " << std::endl;
|
||||||
std::cerr << std::endl;
|
std::cerr << "incoming ping travel time: " << ts - convert64bitsToTs(ping->mPingTS) << std::endl;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
/* with a ping, we just respond as quickly as possible - they do all the analysis */
|
/* with a ping, we just respond as quickly as possible - they do all the analysis */
|
||||||
RsRttPongItem *pong = new RsRttPongItem();
|
RsRttPongItem *pong = new RsRttPongItem();
|
||||||
|
|
||||||
|
|
||||||
pong->PeerId(ping->PeerId());
|
pong->PeerId(ping->PeerId());
|
||||||
pong->mPingTS = ping->mPingTS;
|
pong->mPingTS = ping->mPingTS;
|
||||||
pong->mSeqNo = ping->mSeqNo;
|
pong->mSeqNo = ping->mSeqNo;
|
||||||
|
|
||||||
// add our timestamp.
|
// add our timestamp.
|
||||||
double ts = getCurrentTS();
|
|
||||||
pong->mPongTS = convertTsTo64bits(ts);
|
pong->mPongTS = convertTsTo64bits(ts);
|
||||||
|
|
||||||
|
static double mLastResponseToPong = 0.0 ;// bad stuff
|
||||||
#ifdef DEBUG_RTT
|
#ifdef DEBUG_RTT
|
||||||
std::cerr << "p3rtt::handlePing() With Packet:";
|
std::cerr << "Delay since last response to PONG: " << ts - mLastResponseToPong << std::endl;
|
||||||
std::cerr << std::endl;
|
|
||||||
pong->print(std::cerr, 10);
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
mLastResponseToPong = ts ;
|
||||||
sendItem(pong);
|
sendItem(pong);
|
||||||
return true ;
|
return true ;
|
||||||
}
|
}
|
||||||
@ -291,9 +278,7 @@ int p3rtt::handlePong(RsItem *item)
|
|||||||
RsRttPongItem *pong = (RsRttPongItem *) item;
|
RsRttPongItem *pong = (RsRttPongItem *) item;
|
||||||
|
|
||||||
#ifdef DEBUG_RTT
|
#ifdef DEBUG_RTT
|
||||||
std::cerr << "p3rtt::handlePong() Recvd Packet from: " << pong->PeerId();
|
std::cerr << "p3rtt::handlePong() from: " << pong->PeerId() << " - [" << pong->mSeqNo << "," << std::hex << pong->mPingTS << " -> " << pong->mPongTS << std::dec << "] "<< std::endl;
|
||||||
std::cerr << std::endl;
|
|
||||||
pong->print(std::cerr, 10);
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
/* with a pong, we do the maths! */
|
/* with a pong, we do the maths! */
|
||||||
@ -305,21 +290,12 @@ int p3rtt::handlePong(RsItem *item)
|
|||||||
double offset = pongTS - (recvTS - rtt / 2.0); // so to get to their time, we go ourTS + offset.
|
double offset = pongTS - (recvTS - rtt / 2.0); // so to get to their time, we go ourTS + offset.
|
||||||
|
|
||||||
#ifdef DEBUG_RTT
|
#ifdef DEBUG_RTT
|
||||||
std::cerr << "p3rtt::handlePong() Timing:";
|
std::cerr << "incoming pong travel time: " << recvTS - convert64bitsToTs(pong->mPongTS) << std::endl;
|
||||||
std::cerr << std::endl;
|
std::cerr << " RTT analysis: pingTS: " << std::setprecision(16) << pingTS << ", pongTS: " << pongTS
|
||||||
std::cerr << "\tpingTS: " << pingTS;
|
<< ", recvTS: " << std::setprecision(16) << recvTS << " ==> rtt: " << rtt << ", offset: " << offset << std::endl;
|
||||||
std::cerr << std::endl;
|
|
||||||
std::cerr << "\tpongTS: " << pongTS;
|
|
||||||
std::cerr << std::endl;
|
|
||||||
std::cerr << "\trecvTS: " << recvTS;
|
|
||||||
std::cerr << std::endl;
|
|
||||||
std::cerr << "\t ==> rtt: " << rtt;
|
|
||||||
std::cerr << std::endl;
|
|
||||||
std::cerr << "\t ==> offset: " << offset;
|
|
||||||
std::cerr << std::endl;
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
storePongResult(pong->PeerId(), pong->mSeqNo, pingTS, rtt, offset);
|
storePongResult(pong->PeerId(), pong->mSeqNo, recvTS, rtt, offset);
|
||||||
return true ;
|
return true ;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -333,6 +309,9 @@ int p3rtt::storePingAttempt(const RsPeerId& id, double ts, uint32_t seqno)
|
|||||||
/* find corresponding local data */
|
/* find corresponding local data */
|
||||||
RttPeerInfo *peerInfo = locked_GetPeerInfo(id);
|
RttPeerInfo *peerInfo = locked_GetPeerInfo(id);
|
||||||
|
|
||||||
|
#ifdef DEBUG_RTT
|
||||||
|
std::cerr << "Delay since previous ping attempt: " << ts - peerInfo->mCurrentPingTS << std::endl;
|
||||||
|
#endif
|
||||||
peerInfo->mCurrentPingTS = ts;
|
peerInfo->mCurrentPingTS = ts;
|
||||||
peerInfo->mCurrentPingCounter = seqno;
|
peerInfo->mCurrentPingCounter = seqno;
|
||||||
|
|
||||||
@ -349,7 +328,7 @@ int p3rtt::storePingAttempt(const RsPeerId& id, double ts, uint32_t seqno)
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
int p3rtt::storePongResult(const RsPeerId& id, uint32_t counter, double ts, double rtt, double offset)
|
int p3rtt::storePongResult(const RsPeerId& id, uint32_t counter, double recv_ts, double rtt, double offset)
|
||||||
{
|
{
|
||||||
RsStackMutex stack(mRttMtx); /****** LOCKED MUTEX *******/
|
RsStackMutex stack(mRttMtx); /****** LOCKED MUTEX *******/
|
||||||
|
|
||||||
@ -366,8 +345,12 @@ int p3rtt::storePongResult(const RsPeerId& id, uint32_t counter, double ts, doub
|
|||||||
{
|
{
|
||||||
peerInfo->mCurrentPongRecvd = true;
|
peerInfo->mCurrentPongRecvd = true;
|
||||||
}
|
}
|
||||||
|
#ifdef DEBUG_RTT
|
||||||
|
if(!peerInfo->mPongResults.empty())
|
||||||
|
std::cerr << "Delay since last pong: " << recv_ts - peerInfo->mPongResults.back().mTS << std::endl;
|
||||||
|
#endif
|
||||||
|
|
||||||
peerInfo->mPongResults.push_back(RsRttPongResult(ts, rtt, offset));
|
peerInfo->mPongResults.push_back(RsRttPongResult(recv_ts, rtt, offset));
|
||||||
|
|
||||||
|
|
||||||
while(peerInfo->mPongResults.size() > MAX_PONG_RESULTS)
|
while(peerInfo->mPongResults.size() > MAX_PONG_RESULTS)
|
||||||
|
@ -84,7 +84,7 @@ virtual bool recvItem(RsItem *item); // Overloaded from p3FastService.
|
|||||||
int handlePong(RsItem *item);
|
int handlePong(RsItem *item);
|
||||||
|
|
||||||
int storePingAttempt(const RsPeerId& id, double ts, uint32_t mCounter);
|
int storePingAttempt(const RsPeerId& id, double ts, uint32_t mCounter);
|
||||||
int storePongResult(const RsPeerId& id, uint32_t counter, double ts, double rtt, double offset);
|
int storePongResult(const RsPeerId& id, uint32_t counter, double recv_ts, double rtt, double offset);
|
||||||
|
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
|
Loading…
Reference in New Issue
Block a user