added on/off mechanism for packet slicing to ensure packward compatibility

This commit is contained in:
csoler 2016-04-26 09:22:24 -04:00
parent 3b685851d2
commit 153db5ca64
2 changed files with 53 additions and 7 deletions

View File

@ -48,6 +48,13 @@ static const int PQISTREAM_SLICE_FLAG_STARTS = 0x01; //
static const int PQISTREAM_SLICE_FLAG_ENDS = 0x02; // these flags should be kept in the range 0x01-0x08 static const int PQISTREAM_SLICE_FLAG_ENDS = 0x02; // these flags should be kept in the range 0x01-0x08
static const int PQISTREAM_SLICE_PROTOCOL_VERSION_ID_01 = 0x10; // Protocol version ID. Should hold on the 4 lower bits. static const int PQISTREAM_SLICE_PROTOCOL_VERSION_ID_01 = 0x10; // Protocol version ID. Should hold on the 4 lower bits.
static const int PQISTREAM_PARTIAL_PACKET_HEADER_SIZE = 8; // Same size than normal header, to make the code simpler. static const int PQISTREAM_PARTIAL_PACKET_HEADER_SIZE = 8; // Same size than normal header, to make the code simpler.
static const int PQISTREAM_PACKET_SLICING_PROBE_DELAY = 60; // send every 60 secs.
// This is a probe packet, that won't deserialise (it's empty) but will not cause problems to old peers either, since they will ignore
// it. This packet however will be understood by new peers as a signal to enable packet slicing. This should go when all peers use the
// same protocol.
static uint8_t PACKET_SLICING_PROBE_BYTES[8] = { 0x02, 0xaa, 0xbb, 0xcc, 0x00, 0x00, 0x00, 0x08 } ;
/* This removes the print statements (which hammer pqidebug) */ /* This removes the print statements (which hammer pqidebug) */
/*** /***
@ -71,6 +78,9 @@ pqistreamer::pqistreamer(RsSerialiser *rss, const RsPeerId& id, BinInterface *bi
{ {
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/ RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
mAcceptsPacketSlicing = false ; // by default. Will be turned into true when everyone's ready.
mLastSentPacketSlicingProbe = 0 ;
mAvgLastUpdate = mCurrReadTS = mCurrSentTS = time(NULL); mAvgLastUpdate = mCurrReadTS = mCurrSentTS = time(NULL);
mIncomingSize = 0 ; mIncomingSize = 0 ;
@ -103,6 +113,7 @@ pqistreamer::~pqistreamer()
{ {
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/ RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
std::cerr << "Closing pqistreamer." << std::endl;
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::~pqistreamer() Destruction!"); pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::~pqistreamer() Destruction!");
if (mBio_flags & BIN_FLAGS_NO_CLOSE) if (mBio_flags & BIN_FLAGS_NO_CLOSE)
@ -192,8 +203,8 @@ void pqistreamer::updateRates()
{ {
int64_t diff = int64_t(t) - int64_t(mAvgLastUpdate) ; int64_t diff = int64_t(t) - int64_t(mAvgLastUpdate) ;
float avgReadpSec = getRate(true) * PQISTREAM_AVG_FRAC + (1.0 - PQISTREAM_AVG_FRAC) * mAvgReadCount/(1000.0 * float(diff)); float avgReadpSec = getRate(true ) * PQISTREAM_AVG_FRAC + (1.0 - PQISTREAM_AVG_FRAC) * mAvgReadCount/(1000.0 * float(diff));
float avgSentpSec = getRate(false) * PQISTREAM_AVG_FRAC + (1.0 - PQISTREAM_AVG_FRAC) * mAvgSentCount/(1000.0 * float(diff)); float avgSentpSec = getRate(false) * PQISTREAM_AVG_FRAC + (1.0 - PQISTREAM_AVG_FRAC) * mAvgSentCount/(1000.0 * float(diff));
#ifdef DEBUG_PQISTREAMER #ifdef DEBUG_PQISTREAMER
std::cerr << "Peer " << PeerId() << ": Current speed estimates: " << avgReadpSec << " / " << avgSentpSec << std::endl; std::cerr << "Peer " << PeerId() << ": Current speed estimates: " << avgReadpSec << " / " << avgSentpSec << std::endl;
@ -263,6 +274,7 @@ int pqistreamer::tick_send(uint32_t timeout)
{ {
handleoutgoing_locked(); handleoutgoing_locked();
} }
return 1; return 1;
} }
@ -456,6 +468,9 @@ int pqistreamer::handleoutgoing_locked()
/* if we are not active - clear anything in the queues. */ /* if we are not active - clear anything in the queues. */
locked_clear_out_queue() ; locked_clear_out_queue() ;
std::cerr << "(II) Switching off packet slicing." << std::endl;
mAcceptsPacketSlicing = false ;
/* also remove the pending packets */ /* also remove the pending packets */
if (mPkt_wpending) if (mPkt_wpending)
{ {
@ -497,6 +512,21 @@ int pqistreamer::handleoutgoing_locked()
mPkt_wpending_size = 0 ; mPkt_wpending_size = 0 ;
int k=0; int k=0;
// Checks for inserting a packet slicing probe. We do that to send the other peer the information that packet slicing can be used.
// if so, we enable it for the session. This should be removed (because it's unnecessary) when all users have switched to the new version.
time_t now = time(NULL) ;
if((!mAcceptsPacketSlicing) && now > mLastSentPacketSlicingProbe + PQISTREAM_PACKET_SLICING_PROBE_DELAY)
{
std::cerr << "(II) Inserting packet slicing probe in traffic" << std::endl;
mPkt_wpending_size = 8 ;
mPkt_wpending = rs_malloc(8) ;
memcpy(mPkt_wpending,PACKET_SLICING_PROBE_BYTES,8) ;
mLastSentPacketSlicingProbe = now ;
}
uint32_t slice_size=0; uint32_t slice_size=0;
bool slice_starts=true ; bool slice_starts=true ;
bool slice_ends=true ; bool slice_ends=true ;
@ -504,7 +534,9 @@ int pqistreamer::handleoutgoing_locked()
do do
{ {
dta = locked_pop_out_data(PQISTREAM_OPTIMAL_PACKET_SIZE,slice_size,slice_starts,slice_ends,slice_packet_id) ; int desired_packet_size = mAcceptsPacketSlicing?PQISTREAM_OPTIMAL_PACKET_SIZE:(getRsPktMaxSize());
dta = locked_pop_out_data(desired_packet_size,slice_size,slice_starts,slice_ends,slice_packet_id) ;
if(!dta) if(!dta)
break ; break ;
@ -565,9 +597,9 @@ int pqistreamer::handleoutgoing_locked()
RsScopeTimer tmer("pqistreamer:"+PeerId().toStdString()) ; RsScopeTimer tmer("pqistreamer:"+PeerId().toStdString()) ;
// write packet. // write packet.
//#ifdef DEBUG_PQISTREAMER #ifdef DEBUG_PQISTREAMER
std::cout << "Sending Out Pkt of size " << mPkt_wpending_size << " !" << std::endl; std::cout << "Sending Out Pkt of size " << mPkt_wpending_size << " !" << std::endl;
//#endif #endif
int ss=0; int ss=0;
if (mPkt_wpending_size != (ss = mBio->senddata(mPkt_wpending, mPkt_wpending_size))) if (mPkt_wpending_size != (ss = mBio->senddata(mPkt_wpending, mPkt_wpending_size)))
@ -584,9 +616,13 @@ int pqistreamer::handleoutgoing_locked()
// ensuring exactly the same data is written (openSSL requirement). // ensuring exactly the same data is written (openSSL requirement).
return -1; return -1;
} }
#ifdef DEBUG_PQISTREAMER
else else
std::cerr << PeerId() << ": sent " << ss << " bytes " << std::endl; std::cerr << PeerId() << ": sent " << ss << " bytes " << std::endl;
#endif
++nsent; ++nsent;
outSentBytes_locked(mPkt_wpending_size); // this is the only time where we know exactly what was sent. outSentBytes_locked(mPkt_wpending_size); // this is the only time where we know exactly what was sent.
#ifdef DEBUG_TRANSFERS #ifdef DEBUG_TRANSFERS
@ -703,13 +739,21 @@ start_packet_read:
readbytes += blen; readbytes += blen;
mReading_state = reading_state_packet_started ; mReading_state = reading_state_packet_started ;
mFailed_read_attempts = 0 ; // reset failed read, as the packet has been totally read. mFailed_read_attempts = 0 ; // reset failed read, as the packet has been totally read.
// Check for packet slicing probe (04/26/2016). To be removed when everyone uses it.
if(!memcmp(block,PACKET_SLICING_PROBE_BYTES,8))
{
mAcceptsPacketSlicing = true ;
std::cerr << "(II) Enabling packet slicing!" << std::endl;
}
} }
continue_packet: continue_packet:
{ {
// workout how much more to read. // workout how much more to read.
bool is_partial_packet = false ; bool is_partial_packet = false ;
bool is_packet_starting = (((char*)block)[1] == PQISTREAM_SLICE_FLAG_STARTS) ; bool is_packet_starting = (((char*)block)[1] == PQISTREAM_SLICE_FLAG_STARTS) ; // STARTS and ENDS flags are actually never combined.
bool is_packet_ending = (((char*)block)[1] == PQISTREAM_SLICE_FLAG_ENDS) ; bool is_packet_ending = (((char*)block)[1] == PQISTREAM_SLICE_FLAG_ENDS) ;
bool is_packet_middle = (((char*)block)[1] == 0x00) ; bool is_packet_middle = (((char*)block)[1] == 0x00) ;

View File

@ -161,6 +161,8 @@ class pqistreamer: public PQInterface
std::list<RSTrafficClue> mCurrentStatsChunk_Out ; std::list<RSTrafficClue> mCurrentStatsChunk_Out ;
time_t mStatisticsTimeStamp ; time_t mStatisticsTimeStamp ;
bool mAcceptsPacketSlicing ;
time_t mLastSentPacketSlicingProbe ;
void locked_addTrafficClue(const RsItem *pqi, uint32_t pktsize, std::list<RSTrafficClue> &lst); void locked_addTrafficClue(const RsItem *pqi, uint32_t pktsize, std::list<RSTrafficClue> &lst);
RsItem *addPartialPacket(const void *block, uint32_t len, uint32_t slice_packet_id,bool packet_starting,bool packet_ending); RsItem *addPartialPacket(const void *block, uint32_t len, uint32_t slice_packet_id,bool packet_starting,bool packet_ending);