2007-11-15 03:18:48 +00:00
/*
* " $Id: pqistreamer.cc,v 1.19 2007-02-18 21:46:50 rmf24 Exp $ "
*
* 3 P / PQI network interface for RetroShare .
*
* Copyright 2004 - 2006 by Robert Fernie .
*
* This library is free software ; you can redistribute it and / or
* modify it under the terms of the GNU Library General Public
* License Version 2 as published by the Free Software Foundation .
*
* This library is distributed in the hope that it will be useful ,
* but WITHOUT ANY WARRANTY ; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE . See the GNU
* Library General Public License for more details .
*
* You should have received a copy of the GNU Library General Public
* License along with this library ; if not , write to the Free Software
* Foundation , Inc . , 59 Temple Place , Suite 330 , Boston , MA 02111 - 1307
* USA .
*
* Please report all bugs and problems to " retroshare@lunamutt.com " .
*
*/
2008-07-10 16:29:18 +00:00
# include <iostream>
# include <fstream>
2013-10-21 11:00:49 +00:00
# include <time.h>
2008-07-10 16:29:18 +00:00
# include "util/rsdebug.h"
2012-04-14 22:38:24 +00:00
# include "util/rsstring.h"
2016-04-03 11:38:53 -04:00
# include "util/rsprint.h"
2016-04-19 22:04:30 -04:00
# include "util/rsscopetimer.h"
2007-11-15 03:18:48 +00:00
# include "pqi/pqistreamer.h"
2014-01-07 22:51:22 +00:00
# include "rsserver/p3face.h"
2008-07-10 16:29:18 +00:00
2007-12-12 01:29:14 +00:00
# include "serialiser/rsserial.h"
2007-11-15 03:18:48 +00:00
const int pqistreamerzone = 8221 ;
2016-04-23 17:10:25 -04:00
static const int PQISTREAM_ABS_MAX = 100000000 ; /* 100 MB/sec (actually per loop) */
static const int PQISTREAM_AVG_PERIOD = 5 ; // update speed estimate every 5 seconds
static const float PQISTREAM_AVG_FRAC = 0.8 ; // for bandpass filter over speed estimate.
static const int PQISTREAM_OPTIMAL_PACKET_SIZE = 512 ; // It is believed that this value should be lower than TCP slices and large enough as compare to encryption padding.
// most importantly, it should be constant, so as to allow correct QoS.
2016-04-24 21:18:44 -04:00
static const int PQISTREAM_SLICE_FLAG_STARTS = 0x01 ; //
static const int PQISTREAM_SLICE_FLAG_ENDS = 0x02 ; // these flags should be kept in the range 0x01-0x08
2016-04-25 23:37:02 -04:00
static const int PQISTREAM_SLICE_PROTOCOL_VERSION_ID_01 = 0x10 ; // Protocol version ID. Should hold on the 4 lower bits.
2016-04-23 17:10:25 -04:00
static const int PQISTREAM_PARTIAL_PACKET_HEADER_SIZE = 8 ; // Same size than normal header, to make the code simpler.
2016-04-26 09:22:24 -04:00
static const int PQISTREAM_PACKET_SLICING_PROBE_DELAY = 60 ; // send every 60 secs.
// This is a probe packet, that won't deserialise (it's empty) but will not cause problems to old peers either, since they will ignore
// it. This packet however will be understood by new peers as a signal to enable packet slicing. This should go when all peers use the
// same protocol.
static uint8_t PACKET_SLICING_PROBE_BYTES [ 8 ] = { 0x02 , 0xaa , 0xbb , 0xcc , 0x00 , 0x00 , 0x00 , 0x08 } ;
2007-11-15 03:18:48 +00:00
2008-04-03 12:51:28 +00:00
/* This removes the print statements (which hammer pqidebug) */
/***
# define RSITEM_DEBUG 1
2010-11-13 11:46:17 +00:00
# define DEBUG_TRANSFERS 1
2011-06-24 21:44:29 +00:00
# define DEBUG_PQISTREAMER 1
2016-04-26 21:23:19 -04:00
# define DEBUG_PACKET_SLICING 1
2008-04-03 12:51:28 +00:00
* * */
2010-11-13 11:46:17 +00:00
# ifdef DEBUG_TRANSFERS
2010-11-11 23:59:04 +00:00
# include "util/rsprint.h"
# endif
2008-04-03 12:51:28 +00:00
2014-03-17 20:56:06 +00:00
pqistreamer : : pqistreamer ( RsSerialiser * rss , const RsPeerId & id , BinInterface * bio_in , int bio_flags_in )
2013-10-01 10:11:34 +00:00
: PQInterface ( id ) , mStreamerMtx ( " pqistreamer " ) ,
2013-10-02 03:21:04 +00:00
mBio ( bio_in ) , mBio_flags ( bio_flags_in ) , mRsSerialiser ( rss ) ,
2015-12-12 23:07:33 -05:00
mPkt_wpending ( NULL ) , mPkt_wpending_size ( 0 ) ,
2013-10-01 10:11:34 +00:00
mTotalRead ( 0 ) , mTotalSent ( 0 ) ,
mCurrRead ( 0 ) , mCurrSent ( 0 ) ,
mAvgReadCount ( 0 ) , mAvgSentCount ( 0 )
2007-11-15 03:18:48 +00:00
{
2013-10-01 10:11:34 +00:00
RsStackMutex stack ( mStreamerMtx ) ; /**** LOCKED MUTEX ****/
2016-04-26 09:22:24 -04:00
mAcceptsPacketSlicing = false ; // by default. Will be turned into true when everyone's ready.
mLastSentPacketSlicingProbe = 0 ;
2014-10-31 21:24:42 +00:00
mAvgLastUpdate = mCurrReadTS = mCurrSentTS = time ( NULL ) ;
mIncomingSize = 0 ;
2007-11-15 03:18:48 +00:00
2015-07-13 03:04:36 +00:00
mStatisticsTimeStamp = 0 ;
2007-11-15 03:18:48 +00:00
/* allocated once */
2015-04-04 09:58:53 +00:00
mPkt_rpend_size = 0 ;
mPkt_rpending = 0 ;
2013-10-01 10:11:34 +00:00
mReading_state = reading_state_initial ;
2007-11-15 03:18:48 +00:00
// 100 B/s (minimal)
setMaxRate ( true , 0.1 ) ;
setMaxRate ( false , 0.1 ) ;
setRate ( true , 0 ) ;
setRate ( false , 0 ) ;
2012-04-14 22:38:24 +00:00
pqioutput ( PQL_DEBUG_ALL , pqistreamerzone , " pqistreamer::pqistreamer() Initialisation! " ) ;
2007-11-15 03:18:48 +00:00
if ( ! bio_in )
2009-03-03 19:40:42 +00:00
{
2012-04-14 22:38:24 +00:00
pqioutput ( PQL_ALERT , pqistreamerzone , " pqistreamer::pqistreamer() NULL bio, FATAL ERROR! " ) ;
2009-03-03 19:40:42 +00:00
exit ( 1 ) ;
2007-11-15 03:18:48 +00:00
}
2013-10-01 10:11:34 +00:00
mFailed_read_attempts = 0 ; // reset failed read, as no packet is still read.
2009-03-09 14:10:24 +00:00
2007-11-15 03:18:48 +00:00
return ;
}
pqistreamer : : ~ pqistreamer ( )
{
2013-10-01 10:11:34 +00:00
RsStackMutex stack ( mStreamerMtx ) ; /**** LOCKED MUTEX ****/
2016-04-26 21:23:19 -04:00
# ifdef DEBUG_PQISTREAMER
2016-04-26 09:22:24 -04:00
std : : cerr < < " Closing pqistreamer. " < < std : : endl ;
2016-04-26 21:23:19 -04:00
# endif
2012-04-14 22:38:24 +00:00
pqioutput ( PQL_DEBUG_ALL , pqistreamerzone , " pqistreamer::~pqistreamer() Destruction! " ) ;
2007-11-15 03:18:48 +00:00
2013-10-01 10:11:34 +00:00
if ( mBio_flags & BIN_FLAGS_NO_CLOSE )
2007-11-15 03:18:48 +00:00
{
2012-04-14 22:38:24 +00:00
pqioutput ( PQL_DEBUG_ALL , pqistreamerzone , " pqistreamer::~pqistreamer() Not Closing BinInterface! " ) ;
2007-11-15 03:18:48 +00:00
}
2013-10-01 10:11:34 +00:00
else if ( mBio )
2007-11-15 03:18:48 +00:00
{
2012-04-14 22:38:24 +00:00
pqioutput ( PQL_DEBUG_ALL , pqistreamerzone , " pqistreamer::~pqistreamer() Deleting BinInterface! " ) ;
2007-11-15 03:18:48 +00:00
2013-10-01 10:11:34 +00:00
delete mBio ;
2007-11-15 03:18:48 +00:00
}
2008-04-02 13:55:45 +00:00
/* clean up serialiser */
2013-10-01 10:11:34 +00:00
if ( mRsSerialiser )
delete mRsSerialiser ;
2008-04-02 13:55:45 +00:00
2007-11-15 03:18:48 +00:00
// clean up outgoing. (cntrl packets)
2012-06-23 12:10:41 +00:00
locked_clear_out_queue ( ) ;
2007-11-15 03:18:48 +00:00
2013-10-01 10:11:34 +00:00
if ( mPkt_wpending )
2007-11-15 03:18:48 +00:00
{
2013-10-01 10:11:34 +00:00
free ( mPkt_wpending ) ;
mPkt_wpending = NULL ;
2015-12-12 23:07:33 -05:00
mPkt_wpending_size = 0 ;
2007-11-15 03:18:48 +00:00
}
2015-04-04 09:58:53 +00:00
free_rpend_locked ( ) ;
2007-11-15 03:18:48 +00:00
2012-06-23 12:10:41 +00:00
// clean up incoming.
2014-10-31 21:24:42 +00:00
while ( ! mIncoming . empty ( ) )
2007-11-15 03:18:48 +00:00
{
2013-10-01 10:11:34 +00:00
RsItem * i = mIncoming . front ( ) ;
2014-10-31 21:24:42 +00:00
mIncoming . pop_front ( ) ;
- - mIncomingSize ;
2007-11-15 03:18:48 +00:00
delete i ;
2014-10-31 21:24:42 +00:00
}
if ( mIncomingSize ! = 0 )
std : : cerr < < " (EE) inconsistency after deleting pqistreamer queue. Remaining items: " < < mIncomingSize < < std : : endl ;
2007-11-15 03:18:48 +00:00
return ;
}
// Get/Send Items.
2011-09-04 20:01:30 +00:00
int pqistreamer : : SendItem ( RsItem * si , uint32_t & out_size )
2007-11-15 03:18:48 +00:00
{
2008-04-03 12:51:28 +00:00
# ifdef RSITEM_DEBUG
2012-04-14 22:38:24 +00:00
{
std : : string out = " pqistreamer::SendItem(): \n " ;
si - > print_string ( out ) ;
pqioutput ( PQL_DEBUG_ALL , pqistreamerzone , out ) ;
std : : cerr < < out ;
2007-11-15 03:18:48 +00:00
}
2008-04-03 12:51:28 +00:00
# endif
2007-11-15 03:18:48 +00:00
2013-10-01 10:11:34 +00:00
RsStackMutex stack ( mStreamerMtx ) ; /**** LOCKED MUTEX ****/
return queue_outpqi_locked ( si , out_size ) ;
2007-11-15 03:18:48 +00:00
}
2007-12-12 01:29:14 +00:00
RsItem * pqistreamer : : GetItem ( )
2007-11-15 03:18:48 +00:00
{
2012-09-17 20:25:22 +00:00
# ifdef DEBUG_PQISTREAMER
2012-04-14 22:38:24 +00:00
pqioutput ( PQL_DEBUG_ALL , pqistreamerzone , " pqistreamer::GetItem() " ) ;
2012-09-17 20:25:22 +00:00
# endif
2013-10-01 10:11:34 +00:00
RsStackMutex stack ( mStreamerMtx ) ; /**** LOCKED MUTEX ****/
2007-11-15 03:18:48 +00:00
2013-10-01 10:11:34 +00:00
if ( mIncoming . empty ( ) )
2011-04-08 20:57:16 +00:00
return NULL ;
2007-11-15 03:18:48 +00:00
2013-10-01 10:11:34 +00:00
RsItem * osr = mIncoming . front ( ) ;
2014-10-31 21:24:42 +00:00
mIncoming . pop_front ( ) ;
- - mIncomingSize ;
2007-11-15 03:18:48 +00:00
return osr ;
}
2016-04-09 14:48:05 -04:00
void pqistreamer : : updateRates ( )
2007-11-15 03:18:48 +00:00
{
2016-04-09 14:48:05 -04:00
// now update rates both ways.
2013-10-01 10:11:34 +00:00
2016-04-09 14:48:05 -04:00
time_t t = time ( NULL ) ; // get current timestep.
2007-11-15 03:18:48 +00:00
2016-04-09 14:48:05 -04:00
if ( t > mAvgLastUpdate + PQISTREAM_AVG_PERIOD )
{
2016-04-11 10:10:10 -04:00
int64_t diff = int64_t ( t ) - int64_t ( mAvgLastUpdate ) ;
2016-04-26 09:22:24 -04:00
float avgReadpSec = getRate ( true ) * PQISTREAM_AVG_FRAC + ( 1.0 - PQISTREAM_AVG_FRAC ) * mAvgReadCount / ( 1000.0 * float ( diff ) ) ;
float avgSentpSec = getRate ( false ) * PQISTREAM_AVG_FRAC + ( 1.0 - PQISTREAM_AVG_FRAC ) * mAvgSentCount / ( 1000.0 * float ( diff ) ) ;
2007-11-15 03:18:48 +00:00
2011-09-04 20:01:30 +00:00
# ifdef DEBUG_PQISTREAMER
2016-04-09 14:48:05 -04:00
std : : cerr < < " Peer " < < PeerId ( ) < < " : Current speed estimates: " < < avgReadpSec < < " / " < < avgSentpSec < < std : : endl ;
2011-09-04 20:01:30 +00:00
# endif
2016-04-09 14:48:05 -04:00
/* pretend our rate is zero if we are
* not bandwidthLimited ( ) .
*/
if ( mBio - > bandwidthLimited ( ) )
{
setRate ( true , avgReadpSec ) ;
setRate ( false , avgSentpSec ) ;
}
else
{
std : : cerr < < " Warning: setting to 0 " < < std : : endl ;
setRate ( true , 0 ) ;
setRate ( false , 0 ) ;
}
2007-11-15 03:18:48 +00:00
2016-04-09 14:48:05 -04:00
mAvgLastUpdate = t ;
mAvgReadCount = 0 ;
mAvgSentCount = 0 ;
}
2007-11-15 03:18:48 +00:00
}
2016-04-09 14:48:05 -04:00
2013-10-02 03:21:04 +00:00
int pqistreamer : : tick_bio ( )
{
RsStackMutex stack ( mStreamerMtx ) ; /**** LOCKED MUTEX ****/
mBio - > tick ( ) ;
/* short circuit everything is bio isn't active */
if ( ! ( mBio - > isactive ( ) ) )
{
return 0 ;
}
return 1 ;
}
int pqistreamer : : tick_recv ( uint32_t timeout )
{
RsStackMutex stack ( mStreamerMtx ) ; /**** LOCKED MUTEX ****/
if ( mBio - > moretoread ( timeout ) )
{
handleincoming_locked ( ) ;
}
2015-04-04 09:58:53 +00:00
if ( ! ( mBio - > isactive ( ) ) )
{
free_rpend_locked ( ) ;
}
2013-10-02 03:21:04 +00:00
return 1 ;
}
int pqistreamer : : tick_send ( uint32_t timeout )
{
RsStackMutex stack ( mStreamerMtx ) ; /**** LOCKED MUTEX ****/
/* short circuit everything is bio isn't active */
if ( ! ( mBio - > isactive ( ) ) )
{
return 0 ;
}
if ( mBio - > cansend ( timeout ) )
{
handleoutgoing_locked ( ) ;
}
2016-04-26 09:22:24 -04:00
2013-10-02 03:21:04 +00:00
return 1 ;
}
2007-11-15 03:18:48 +00:00
int pqistreamer : : status ( )
{
2013-10-01 10:11:34 +00:00
RsStackMutex stack ( mStreamerMtx ) ; /**** LOCKED MUTEX ****/
2012-09-17 20:25:22 +00:00
# ifdef DEBUG_PQISTREAMER
2012-04-14 22:38:24 +00:00
pqioutput ( PQL_DEBUG_ALL , pqistreamerzone , " pqistreamer::status() " ) ;
2007-11-15 03:18:48 +00:00
2013-10-01 10:11:34 +00:00
if ( mBio - > isactive ( ) )
2007-11-15 03:18:48 +00:00
{
2012-04-14 22:38:24 +00:00
std : : string out ;
2013-10-01 10:11:34 +00:00
rs_sprintf ( out , " Data in:%d out:%d " , mTotalRead , mTotalSent ) ;
2012-04-14 22:38:24 +00:00
pqioutput ( PQL_DEBUG_BASIC , pqistreamerzone , out ) ;
2007-11-15 03:18:48 +00:00
}
2012-09-17 20:25:22 +00:00
# endif
2007-11-15 03:18:48 +00:00
return 0 ;
}
2016-04-23 17:10:25 -04:00
void pqistreamer : : locked_storeInOutputQueue ( void * ptr , int , int )
2012-06-23 12:10:41 +00:00
{
2013-10-01 10:11:34 +00:00
mOutPkts . push_back ( ptr ) ;
2012-06-23 12:10:41 +00:00
}
2007-11-15 03:18:48 +00:00
//
/**************** HANDLE OUTGOING TRANSLATION + TRANSMISSION ******/
2013-10-01 10:11:34 +00:00
int pqistreamer : : queue_outpqi_locked ( RsItem * pqi , uint32_t & pktsize )
2007-11-15 03:18:48 +00:00
{
2011-09-04 20:01:30 +00:00
pktsize = 0 ;
2009-12-13 21:59:26 +00:00
# ifdef DEBUG_PQISTREAMER
std : : cerr < < " pqistreamer::queue_outpqi() called. " < < std : : endl ;
# endif
2009-03-11 20:36:51 +00:00
2007-11-15 03:18:48 +00:00
/* decide which type of packet it is */
2013-10-01 10:11:34 +00:00
pktsize = mRsSerialiser - > size ( pqi ) ;
2016-01-12 21:43:04 -05:00
void * ptr = rs_malloc ( pktsize ) ;
2016-01-12 21:10:11 -05:00
if ( ptr = = NULL )
return 0 ;
2009-02-15 20:22:34 +00:00
2009-12-13 21:59:26 +00:00
# ifdef DEBUG_PQISTREAMER
2011-09-04 20:01:30 +00:00
std : : cerr < < " pqistreamer::queue_outpqi() serializing packet with packet size : " < < pktsize < < std : : endl ;
2009-12-13 21:59:26 +00:00
# endif
2015-07-12 04:04:18 +00:00
/*******************************************************************************************/
// keep info for stats for a while. Only keep the items for the last two seconds. sec n is ongoing and second n-1
// is a full statistics chunk that can be used in the GUI
locked_addTrafficClue ( pqi , pktsize , mCurrentStatsChunk_Out ) ;
/*******************************************************************************************/
2013-10-01 10:11:34 +00:00
if ( mRsSerialiser - > serialise ( pqi , ptr , & pktsize ) )
2007-11-15 03:18:48 +00:00
{
2016-04-23 17:10:25 -04:00
locked_storeInOutputQueue ( ptr , pktsize , pqi - > priority_level ( ) ) ;
2011-09-04 20:01:30 +00:00
2013-10-01 10:11:34 +00:00
if ( ! ( mBio_flags & BIN_FLAGS_NO_DELETE ) )
2008-01-21 09:22:42 +00:00
{
delete pqi ;
}
2007-11-15 03:18:48 +00:00
return 1 ;
}
2007-12-12 01:29:14 +00:00
else
{
/* cleanup serialiser */
free ( ptr ) ;
}
2007-11-15 03:18:48 +00:00
2012-04-14 22:38:24 +00:00
std : : string out = " pqistreamer::queue_outpqi() Null Pkt generated! \n Caused By: \n " ;
pqi - > print_string ( out ) ;
pqioutput ( PQL_ALERT , pqistreamerzone , out ) ;
2007-11-15 03:18:48 +00:00
2013-10-01 10:11:34 +00:00
if ( ! ( mBio_flags & BIN_FLAGS_NO_DELETE ) )
2008-01-21 09:22:42 +00:00
{
delete pqi ;
}
2007-11-15 03:18:48 +00:00
return 1 ; // keep error internal.
}
2015-07-12 04:04:18 +00:00
int pqistreamer : : handleincomingitem_locked ( RsItem * pqi , int len )
2007-11-15 03:18:48 +00:00
{
2013-10-01 10:11:34 +00:00
2011-09-04 20:01:30 +00:00
# ifdef DEBUG_PQISTREAMER
2013-10-01 10:11:34 +00:00
pqioutput ( PQL_DEBUG_ALL , pqistreamerzone , " pqistreamer::handleincomingitem_locked() " ) ;
2011-09-04 20:01:30 +00:00
# endif
2012-01-19 16:23:57 +00:00
// timestamp last received packet.
mLastIncomingTs = time ( NULL ) ;
2007-11-15 03:18:48 +00:00
// Use overloaded Contact function
2007-12-12 01:29:14 +00:00
pqi - > PeerId ( PeerId ( ) ) ;
2014-10-31 21:24:42 +00:00
mIncoming . push_back ( pqi ) ;
+ + mIncomingSize ;
2015-07-12 04:04:18 +00:00
/*******************************************************************************************/
// keep info for stats for a while. Only keep the items for the last two seconds. sec n is ongoing and second n-1
// is a full statistics chunk that can be used in the GUI
locked_addTrafficClue ( pqi , len , mCurrentStatsChunk_In ) ;
/*******************************************************************************************/
2007-11-15 03:18:48 +00:00
return 1 ;
}
2015-07-12 04:04:18 +00:00
void pqistreamer : : locked_addTrafficClue ( const RsItem * pqi , uint32_t pktsize , std : : list < RSTrafficClue > & lst )
{
time_t now = time ( NULL ) ;
if ( now > mStatisticsTimeStamp ) // new chunk => get rid of oldest, replace old list by current list, clear current list.
{
mPreviousStatsChunk_Out = mCurrentStatsChunk_Out ;
mPreviousStatsChunk_In = mCurrentStatsChunk_In ;
mCurrentStatsChunk_Out . clear ( ) ;
mCurrentStatsChunk_In . clear ( ) ;
mStatisticsTimeStamp = now ;
}
RSTrafficClue tc ;
tc . TS = now ;
tc . size = pktsize ;
tc . priority = pqi - > priority_level ( ) ;
tc . peer_id = pqi - > PeerId ( ) ;
tc . count = 1 ;
tc . service_id = pqi - > PacketService ( ) ;
tc . service_sub_id = pqi - > PacketSubType ( ) ;
lst . push_back ( tc ) ;
}
2012-01-19 16:23:57 +00:00
time_t pqistreamer : : getLastIncomingTS ( )
{
2013-10-01 10:11:34 +00:00
RsStackMutex stack ( mStreamerMtx ) ; /**** LOCKED MUTEX ****/
2012-01-19 16:23:57 +00:00
return mLastIncomingTs ;
}
2016-04-20 22:42:09 -04:00
// Packet slicing:
//
// Old : 02 0014 03 00000026 [data, 26 bytes] => [version 1B] [service 2B][subpacket 1B] [size 4B]
// New1: fv 0014 03 xxxxx sss [data, sss bytes] => [flags 0.5B version 0.5B] [service 2B][subpacket 1B] [packet counter 2.5B size 1.5B]
2016-04-25 23:37:02 -04:00
// New2: pp ff xxxxxxxx ssss [data, sss bytes] => [flags 1B] [protocol version 1B] [2^32 packet count] [2^16 size]
2016-04-20 22:42:09 -04:00
//
// Flags: 0x1 => incomplete packet continued after
// Flags: 0x2 => packet ending a previously incomplete packet
//
// - backward compatibility:
// * send one packet with service + subpacket = ffffff. Old peers will silently ignore such packets.
// * if received, mark the peer as able to decode the new packet type
//
// Mode 1:
// - Encode length on 1.5 Bytes (10 bits) => max slice size = 1024
// - Encode packet ID on 2.5 Bytes (20 bits) => packet counter = [0...1056364]
// Mode 2:
2016-04-25 23:37:02 -04:00
// - Encode protocol on 1.0 Bytes ( 8 bits)
// - Encode flags on 1.0 Bytes ( 8 bits)
// - Encode packet ID on 4.0 Bytes (32 bits) => packet counter = [0...2^32]
2016-04-20 22:42:09 -04:00
// - Encode size on 2.0 Bytes (16 bits) => 65536 // max slice size = 65536
//
// - limit packet grouping to max size 1024.
// - new peers need to read flux, and properly extract partial sizes, and combine packets based on packet counter.
// - on sending, RS should grab slices of max size 1024 from pqiQoS. If smaller, possibly pack them together.
// pqiQoS keeps track of sliced packets and makes sure the output is consistent:
// * when a large packet needs to be send, only takes a slice and return it, and update the remaining part
// * always consider priority when taking new slices => a newly arrived fast packet will always get through.
//
// Max slice size should be customisable, depending on bandwidth.
2013-10-01 10:11:34 +00:00
int pqistreamer : : handleoutgoing_locked ( )
2007-11-15 03:18:48 +00:00
{
2011-09-04 20:01:30 +00:00
# ifdef DEBUG_PQISTREAMER
2015-12-12 11:52:48 -05:00
pqioutput ( PQL_DEBUG_ALL , pqistreamerzone , " pqistreamer::handleoutgoing_locked() " ) ;
2011-09-04 20:01:30 +00:00
# endif
2007-11-15 03:18:48 +00:00
2015-12-12 11:52:48 -05:00
int maxbytes = outAllowedBytes_locked ( ) ;
int sentbytes = 0 ;
2015-12-12 23:07:33 -05:00
2015-12-12 11:52:48 -05:00
// std::cerr << "pqistreamer: maxbytes=" << maxbytes<< std::endl ;
2007-11-15 03:18:48 +00:00
2015-12-12 11:52:48 -05:00
std : : list < void * > : : iterator it ;
2007-11-15 03:18:48 +00:00
2015-12-12 11:52:48 -05:00
// if not connection, or cannot send anything... pause.
if ( ! ( mBio - > isactive ( ) ) )
{
/* if we are not active - clear anything in the queues. */
locked_clear_out_queue ( ) ;
2016-04-26 21:23:19 -04:00
# ifdef DEBUG_PACKET_SLICING
2016-04-26 09:22:24 -04:00
std : : cerr < < " (II) Switching off packet slicing. " < < std : : endl ;
2016-04-26 21:23:19 -04:00
# endif
2016-04-26 09:22:24 -04:00
mAcceptsPacketSlicing = false ;
2015-12-12 11:52:48 -05:00
/* also remove the pending packets */
if ( mPkt_wpending )
{
free ( mPkt_wpending ) ;
mPkt_wpending = NULL ;
2015-12-12 23:07:33 -05:00
mPkt_wpending_size = 0 ;
2015-12-12 11:52:48 -05:00
}
return 0 ;
}
2007-11-15 03:18:48 +00:00
2015-12-12 11:52:48 -05:00
// a very simple round robin
2009-03-03 19:40:42 +00:00
2015-12-12 11:52:48 -05:00
bool sent = true ;
int nsent = 0 ;
while ( sent ) // catch if all items sent.
{
sent = false ;
2007-11-15 03:18:48 +00:00
2015-12-12 11:52:48 -05:00
if ( ( ! ( mBio - > cansend ( 0 ) ) ) | | ( maxbytes < sentbytes ) )
{
2010-11-11 23:59:04 +00:00
2016-04-26 21:23:19 -04:00
# ifdef DEBUG_PACKET_SLICING
2015-12-12 11:52:48 -05:00
if ( maxbytes < sentbytes )
2016-04-19 22:04:30 -04:00
std : : cerr < < " pqistreamer::handleoutgoing_locked() Stopped sending: bio not ready. maxbytes= " < < maxbytes < < " , sentbytes= " < < sentbytes < < std : : endl ;
2015-12-12 11:52:48 -05:00
else
2016-04-19 22:04:30 -04:00
std : : cerr < < " pqistreamer::handleoutgoing_locked() Stopped sending: sentbytes= " < < sentbytes < < " , max= " < < maxbytes < < std : : endl ;
2016-04-26 21:23:19 -04:00
# endif
2010-11-11 23:59:04 +00:00
2015-12-12 11:52:48 -05:00
return 0 ;
}
2016-04-23 17:10:25 -04:00
// send a out_pkt., else send out_data. unless there is a pending packet. The strategy is to
// - grab as many packets as possible while below the optimal packet size, so as to allow some packing and decrease encryption padding overhead (suposeddly)
// - limit packets size to OPTIMAL_PACKET_SIZE when sending big packets so as to keep as much QoS as possible.
2016-04-24 21:29:55 -04:00
2016-04-23 17:10:25 -04:00
if ( ! mPkt_wpending )
{
void * dta ;
mPkt_wpending_size = 0 ;
int k = 0 ;
2016-04-26 09:22:24 -04:00
// Checks for inserting a packet slicing probe. We do that to send the other peer the information that packet slicing can be used.
// if so, we enable it for the session. This should be removed (because it's unnecessary) when all users have switched to the new version.
time_t now = time ( NULL ) ;
if ( ( ! mAcceptsPacketSlicing ) & & now > mLastSentPacketSlicingProbe + PQISTREAM_PACKET_SLICING_PROBE_DELAY )
{
2016-04-26 21:23:19 -04:00
# ifdef DEBUG_PACKET_SLICING
2016-04-26 09:22:24 -04:00
std : : cerr < < " (II) Inserting packet slicing probe in traffic " < < std : : endl ;
2016-04-26 21:23:19 -04:00
# endif
2016-04-26 09:22:24 -04:00
mPkt_wpending_size = 8 ;
mPkt_wpending = rs_malloc ( 8 ) ;
memcpy ( mPkt_wpending , PACKET_SLICING_PROBE_BYTES , 8 ) ;
mLastSentPacketSlicingProbe = now ;
}
2016-04-23 17:10:25 -04:00
uint32_t slice_size = 0 ;
bool slice_starts = true ;
bool slice_ends = true ;
uint32_t slice_packet_id = 0 ;
do
{
2016-04-26 09:22:24 -04:00
int desired_packet_size = mAcceptsPacketSlicing ? PQISTREAM_OPTIMAL_PACKET_SIZE : ( getRsPktMaxSize ( ) ) ;
dta = locked_pop_out_data ( desired_packet_size , slice_size , slice_starts , slice_ends , slice_packet_id ) ;
2016-04-23 17:10:25 -04:00
if ( ! dta )
break ;
2016-04-25 23:37:02 -04:00
if ( slice_starts & & slice_ends ) // good old method. Send the packet as is, since it's a full packet.
{
2016-04-26 21:23:19 -04:00
# ifdef DEBUG_PACKET_SLICING
std : : cerr < < " sending full slice, old style. Size= " < < slice_size < < std : : endl ;
# endif
2016-04-25 23:37:02 -04:00
mPkt_wpending = realloc ( mPkt_wpending , slice_size + mPkt_wpending_size ) ;
memcpy ( & ( ( char * ) mPkt_wpending ) [ mPkt_wpending_size ] , dta , slice_size ) ;
free ( dta ) ;
mPkt_wpending_size + = slice_size ;
+ + k ;
}
else // partial packet. We make a special header for it and insert it in the stream
{
2016-04-30 11:13:51 -04:00
if ( slice_size > 0xffff | | ! mAcceptsPacketSlicing )
{
std : : cerr < < " (EE) protocol error in pqitreamer: slice size is too large and cannot be encoded. " ;
free ( mPkt_wpending ) ;
mPkt_wpending_size = 0 ;
return - 1 ;
}
2016-04-26 21:23:19 -04:00
# ifdef DEBUG_PACKET_SLICING
2016-04-25 23:37:02 -04:00
std : : cerr < < " sending partial slice, packet ID= " < < std : : hex < < slice_packet_id < < std : : dec < < " , size= " < < slice_size < < std : : endl ;
2016-04-26 21:23:19 -04:00
# endif
2016-04-25 23:37:02 -04:00
mPkt_wpending = realloc ( mPkt_wpending , slice_size + mPkt_wpending_size + PQISTREAM_PARTIAL_PACKET_HEADER_SIZE ) ;
memcpy ( & ( ( char * ) mPkt_wpending ) [ mPkt_wpending_size + PQISTREAM_PARTIAL_PACKET_HEADER_SIZE ] , dta , slice_size ) ;
free ( dta ) ;
// New2: pp ff xxxxxxxx ssss [data, sss bytes] => [flags 1B] [protocol version 1B] [2^32 packet count] [2^16 size]
uint8_t partial_flags = 0 ;
if ( slice_starts ) partial_flags | = PQISTREAM_SLICE_FLAG_STARTS ;
if ( slice_ends ) partial_flags | = PQISTREAM_SLICE_FLAG_ENDS ;
( ( char * ) mPkt_wpending ) [ mPkt_wpending_size + 0x00 ] = PQISTREAM_SLICE_PROTOCOL_VERSION_ID_01 ;
( ( char * ) mPkt_wpending ) [ mPkt_wpending_size + 0x01 ] = partial_flags ;
( ( char * ) mPkt_wpending ) [ mPkt_wpending_size + 0x02 ] = uint8_t ( slice_packet_id > > 24 ) & 0xff ;
( ( char * ) mPkt_wpending ) [ mPkt_wpending_size + 0x03 ] = uint8_t ( slice_packet_id > > 16 ) & 0xff ;
( ( char * ) mPkt_wpending ) [ mPkt_wpending_size + 0x04 ] = uint8_t ( slice_packet_id > > 8 ) & 0xff ;
( ( char * ) mPkt_wpending ) [ mPkt_wpending_size + 0x05 ] = uint8_t ( slice_packet_id > > 0 ) & 0xff ;
( ( char * ) mPkt_wpending ) [ mPkt_wpending_size + 0x06 ] = uint8_t ( slice_size > > 8 ) & 0xff ;
( ( char * ) mPkt_wpending ) [ mPkt_wpending_size + 0x07 ] = uint8_t ( slice_size > > 0 ) & 0xff ;
mPkt_wpending_size + = slice_size + PQISTREAM_PARTIAL_PACKET_HEADER_SIZE ;
+ + k ;
}
2016-04-23 17:10:25 -04:00
}
while ( mPkt_wpending_size < ( uint32_t ) maxbytes & & mPkt_wpending_size < PQISTREAM_OPTIMAL_PACKET_SIZE ) ;
2015-12-19 21:20:25 -05:00
# ifdef DEBUG_PQISTREAMER
2016-04-23 17:10:25 -04:00
if ( k > 1 )
std : : cerr < < " Packed " < < k < < " packets into " < < mPkt_wpending_size < < " bytes. " < < std : : endl ;
2015-12-19 21:20:25 -05:00
# endif
2016-04-23 17:10:25 -04:00
}
2016-04-20 22:42:09 -04:00
2015-12-12 11:52:48 -05:00
if ( mPkt_wpending )
{
// write packet.
2016-04-26 09:22:24 -04:00
# ifdef DEBUG_PQISTREAMER
std : : cout < < " Sending Out Pkt of size " < < mPkt_wpending_size < < " ! " < < std : : endl ;
# endif
2015-12-12 23:07:33 -05:00
int ss = 0 ;
2009-02-26 14:04:48 +00:00
2015-12-12 23:07:33 -05:00
if ( mPkt_wpending_size ! = ( ss = mBio - > senddata ( mPkt_wpending , mPkt_wpending_size ) ) )
2015-12-12 11:52:48 -05:00
{
2012-09-17 20:25:22 +00:00
# ifdef DEBUG_PQISTREAMER
2015-12-12 11:52:48 -05:00
std : : string out ;
2015-12-12 23:07:33 -05:00
rs_sprintf ( out , " Problems with Send Data! (only %d bytes sent, total pkt size=%d) " , ss , mPkt_wpending_size ) ;
2015-12-12 11:52:48 -05:00
// std::cerr << out << std::endl ;
pqioutput ( PQL_DEBUG_BASIC , pqistreamerzone , out ) ;
2012-09-17 20:25:22 +00:00
# endif
2016-04-19 22:04:30 -04:00
std : : cerr < < PeerId ( ) < < " : sending failed. Only " < < ss < < " bytes sent over " < < mPkt_wpending_size < < std : : endl ;
2007-11-15 03:18:48 +00:00
2015-12-12 11:52:48 -05:00
// pkt_wpending will kept til next time.
// ensuring exactly the same data is written (openSSL requirement).
return - 1 ;
}
2016-04-26 09:22:24 -04:00
# ifdef DEBUG_PQISTREAMER
2016-04-19 22:04:30 -04:00
else
std : : cerr < < PeerId ( ) < < " : sent " < < ss < < " bytes " < < std : : endl ;
2016-04-26 09:22:24 -04:00
# endif
2015-12-12 11:52:48 -05:00
+ + nsent ;
2016-04-26 09:22:24 -04:00
2016-01-05 11:43:50 -05:00
outSentBytes_locked ( mPkt_wpending_size ) ; // this is the only time where we know exactly what was sent.
2007-11-15 03:18:48 +00:00
2010-11-13 11:46:17 +00:00
# ifdef DEBUG_TRANSFERS
2015-12-12 23:07:33 -05:00
std : : cerr < < " pqistreamer::handleoutgoing_locked() Sent Packet len: " < < mPkt_wpending_size < < " @ " < < RsUtil : : AccurateTimeString ( ) ;
2015-12-12 11:52:48 -05:00
std : : cerr < < std : : endl ;
2010-11-11 23:59:04 +00:00
# endif
2015-12-12 23:07:33 -05:00
sentbytes + = mPkt_wpending_size ;
2015-12-12 11:52:48 -05:00
free ( mPkt_wpending ) ;
mPkt_wpending = NULL ;
2015-12-12 23:07:33 -05:00
mPkt_wpending_size = 0 ;
2007-11-15 03:18:48 +00:00
2015-12-12 11:52:48 -05:00
sent = true ;
}
}
2015-12-19 21:20:25 -05:00
# ifdef DEBUG_PQISTREAMER
2015-12-12 11:52:48 -05:00
if ( nsent > 0 )
std : : cerr < < " nsent = " < < nsent < < " , total bytes= " < < sentbytes < < std : : endl ;
2015-12-19 21:20:25 -05:00
# endif
2015-12-12 11:52:48 -05:00
return 1 ;
2007-11-15 03:18:48 +00:00
}
/* Handles reading from input stream.
*/
2013-10-01 10:11:34 +00:00
int pqistreamer : : handleincoming_locked ( )
2009-03-03 19:40:42 +00:00
{
2016-04-25 22:50:41 -04:00
int readbytes = 0 ;
static const int max_failed_read_attempts = 2000 ;
2009-03-03 19:40:42 +00:00
2011-09-04 20:01:30 +00:00
# ifdef DEBUG_PQISTREAMER
2016-04-25 22:50:41 -04:00
pqioutput ( PQL_DEBUG_ALL , pqistreamerzone , " pqistreamer::handleincoming_locked() " ) ;
2011-09-04 20:01:30 +00:00
# endif
2009-03-03 19:40:42 +00:00
2016-04-25 22:50:41 -04:00
if ( ! ( mBio - > isactive ( ) ) )
{
mReading_state = reading_state_initial ;
free_rpend_locked ( ) ;
return 0 ;
}
2015-04-04 09:58:53 +00:00
else
2016-04-25 22:50:41 -04:00
allocate_rpend_locked ( ) ;
2009-03-03 19:40:42 +00:00
2016-04-25 22:50:41 -04:00
// enough space to read any packet.
int maxlen = mPkt_rpend_size ;
void * block = mPkt_rpending ;
2009-03-03 19:40:42 +00:00
2016-04-25 22:50:41 -04:00
// initial read size: basic packet.
int blen = getRsPktBaseSize ( ) ; // this is valid for both packet slices and normal un-sliced packets (same header size)
2009-03-03 19:40:42 +00:00
2016-04-25 22:50:41 -04:00
int maxin = inAllowedBytes_locked ( ) ;
2009-03-03 19:40:42 +00:00
2009-07-21 20:14:31 +00:00
# ifdef DEBUG_PQISTREAMER
2016-04-25 22:50:41 -04:00
std : : cerr < < " [ " < < ( void * ) pthread_self ( ) < < " ] " < < " reading state = " < < mReading_state < < std : : endl ;
2009-07-21 20:14:31 +00:00
# endif
2016-04-25 22:50:41 -04:00
switch ( mReading_state )
{
case reading_state_initial : /*std::cerr << "jumping to start" << std::endl; */ goto start_packet_read ;
case reading_state_packet_started : /*std::cerr << "jumping to middle" << std::endl;*/ goto continue_packet ;
}
2009-03-03 19:40:42 +00:00
start_packet_read :
2016-04-25 22:50:41 -04:00
{ // scope to ensure variable visibility
// read the basic block (minimum packet size)
int tmplen ;
2009-07-21 20:14:31 +00:00
# ifdef DEBUG_PQISTREAMER
2016-04-25 22:50:41 -04:00
std : : cerr < < " [ " < < ( void * ) pthread_self ( ) < < " ] " < < " starting packet " < < std : : endl ;
2009-07-21 20:14:31 +00:00
# endif
2016-04-25 22:50:41 -04:00
memset ( block , 0 , blen ) ; // reset the block, to avoid uninitialized memory reads.
2009-03-03 19:40:42 +00:00
2016-04-25 22:50:41 -04:00
if ( blen ! = ( tmplen = mBio - > readdata ( block , blen ) ) )
{
pqioutput ( PQL_DEBUG_BASIC , pqistreamerzone , " pqistreamer::handleincoming() Didn't read BasePkt! " ) ;
2007-11-15 03:18:48 +00:00
2016-04-25 22:50:41 -04:00
// error.... (either blocked or failure)
if ( tmplen = = 0 )
{
2012-09-17 20:25:22 +00:00
# ifdef DEBUG_PQISTREAMER
2016-04-25 22:50:41 -04:00
// most likely blocked!
pqioutput ( PQL_DEBUG_BASIC , pqistreamerzone , " pqistreamer::handleincoming() read blocked " ) ;
std : : cerr < < " [ " < < ( void * ) pthread_self ( ) < < " ] " < < " given up 1 " < < std : : endl ;
2009-07-21 20:14:31 +00:00
# endif
2016-04-25 22:50:41 -04:00
return 0 ;
}
else if ( tmplen < 0 )
{
// Most likely it is that the packet is pending but could not be read by pqissl because of stream flow.
// So we return without an error, and leave the machine state in 'start_read'.
//
//pqioutput(PQL_WARNING, pqistreamerzone, "pqistreamer::handleincoming() Error in bio read");
2009-07-21 20:14:31 +00:00
# ifdef DEBUG_PQISTREAMER
2016-04-25 22:50:41 -04:00
std : : cerr < < " [ " < < ( void * ) pthread_self ( ) < < " ] " < < " given up 2, state = " < < mReading_state < < std : : endl ;
2009-07-21 20:14:31 +00:00
# endif
2016-04-25 22:50:41 -04:00
return 0 ;
}
else // tmplen > 0
{
// strange case....This should never happen as partial reads are handled by pqissl below.
2012-09-17 20:25:22 +00:00
# ifdef DEBUG_PQISTREAMER
2016-04-25 22:50:41 -04:00
std : : string out = " pqistreamer::handleincoming() Incomplete " ;
rs_sprintf_append ( out , " (Strange) read of %d bytes " , tmplen ) ;
pqioutput ( PQL_ALERT , pqistreamerzone , out ) ;
2012-09-17 20:25:22 +00:00
2016-04-25 22:50:41 -04:00
std : : cerr < < " [ " < < ( void * ) pthread_self ( ) < < " ] " < < " given up 3 " < < std : : endl ;
2009-07-21 20:14:31 +00:00
# endif
2016-04-25 22:50:41 -04:00
return - 1 ;
}
}
2009-07-21 20:14:31 +00:00
# ifdef DEBUG_PQISTREAMER
2016-04-25 22:50:41 -04:00
std : : cerr < < " [ " < < ( void * ) pthread_self ( ) < < " ] " < < " block 0 : " < < RsUtil : : BinToHex ( block , 8 ) < < std : : endl ;
2009-07-21 20:14:31 +00:00
# endif
2009-03-03 19:40:42 +00:00
2016-04-25 22:50:41 -04:00
readbytes + = blen ;
mReading_state = reading_state_packet_started ;
mFailed_read_attempts = 0 ; // reset failed read, as the packet has been totally read.
2016-04-26 09:22:24 -04:00
// Check for packet slicing probe (04/26/2016). To be removed when everyone uses it.
if ( ! memcmp ( block , PACKET_SLICING_PROBE_BYTES , 8 ) )
{
mAcceptsPacketSlicing = true ;
2016-04-26 21:23:19 -04:00
# ifdef DEBUG_PACKET_SLICING
2016-04-26 09:22:24 -04:00
std : : cerr < < " (II) Enabling packet slicing! " < < std : : endl ;
2016-04-26 21:23:19 -04:00
# endif
2016-04-26 09:22:24 -04:00
}
2016-04-25 22:50:41 -04:00
}
2009-03-03 19:40:42 +00:00
continue_packet :
2016-04-25 22:50:41 -04:00
{
// workout how much more to read.
bool is_partial_packet = false ;
2016-04-26 09:22:24 -04:00
bool is_packet_starting = ( ( ( char * ) block ) [ 1 ] = = PQISTREAM_SLICE_FLAG_STARTS ) ; // STARTS and ENDS flags are actually never combined.
2016-04-25 23:37:02 -04:00
bool is_packet_ending = ( ( ( char * ) block ) [ 1 ] = = PQISTREAM_SLICE_FLAG_ENDS ) ;
bool is_packet_middle = ( ( ( char * ) block ) [ 1 ] = = 0x00 ) ;
2016-04-25 22:50:41 -04:00
uint32_t extralen = 0 ;
uint32_t slice_packet_id = 0 ;
2016-04-25 23:37:02 -04:00
if ( ( ( char * ) block ) [ 0 ] = = PQISTREAM_SLICE_PROTOCOL_VERSION_ID_01 & & ( is_packet_starting | | is_packet_middle | | is_packet_ending ) )
2016-04-25 22:50:41 -04:00
{
2016-04-25 23:37:02 -04:00
extralen = ( uint32_t ( ( ( uint8_t * ) block ) [ 6 ] ) < < 8 ) + ( uint32_t ( ( ( uint8_t * ) block ) [ 7 ] ) ) ;
slice_packet_id = ( uint32_t ( ( ( uint8_t * ) block ) [ 2 ] ) < < 24 ) + ( uint32_t ( ( ( uint8_t * ) block ) [ 3 ] ) < < 16 ) + ( uint32_t ( ( ( uint8_t * ) block ) [ 4 ] ) < < 8 ) + ( uint32_t ( ( ( uint8_t * ) block ) [ 5 ] ) < < 0 ) ;
2016-04-25 22:50:41 -04:00
2016-04-26 21:23:19 -04:00
# ifdef DEBUG_PACKET_SLICING
2016-04-25 23:37:02 -04:00
std : : cerr < < " Reading partial packet from mem block " < < RsUtil : : BinToHex ( ( char * ) block , 8 ) < < " : packet_id= " < < std : : hex < < slice_packet_id < < std : : dec < < " , len= " < < extralen < < std : : endl ;
2016-04-26 21:23:19 -04:00
# endif
2016-04-25 22:50:41 -04:00
is_partial_packet = true ;
2016-04-29 17:53:21 -04:00
mAcceptsPacketSlicing = true ; // this is needed
2016-04-25 22:50:41 -04:00
}
else
2016-04-25 23:37:02 -04:00
extralen = getRsItemSize ( block ) - blen ; // old style packet type
2009-03-03 19:40:42 +00:00
2009-07-21 20:14:31 +00:00
# ifdef DEBUG_PQISTREAMER
2016-04-25 22:50:41 -04:00
std : : cerr < < " [ " < < ( void * ) pthread_self ( ) < < " ] " < < " continuing packet getRsItemSize(block) = " < < getRsItemSize ( block ) < < std : : endl ;
std : : cerr < < " [ " < < ( void * ) pthread_self ( ) < < " ] " < < " continuing packet extralen = " < < extralen < < std : : endl ;
std : : cerr < < " [ " < < ( void * ) pthread_self ( ) < < " ] " < < " continuing packet state= " < < mReading_state < < std : : endl ;
std : : cerr < < " [ " < < ( void * ) pthread_self ( ) < < " ] " < < " block 1 : " < < RsUtil : : BinToHex ( block , 8 ) < < std : : endl ;
2009-07-21 20:14:31 +00:00
# endif
2016-04-25 22:50:41 -04:00
if ( extralen > maxlen - blen )
{
pqioutput ( PQL_ALERT , pqistreamerzone , " ERROR: Read Packet too Big! " ) ;
2009-03-03 19:40:42 +00:00
2016-04-25 22:50:41 -04:00
p3Notify * notify = RsServer : : notify ( ) ;
if ( notify )
{
std : : string title =
" Warning: Bad Packet Read " ;
std : : string msg ;
msg = " **** WARNING **** \n " ;
msg + = " Retroshare has caught a BAD Packet Read " ;
msg + = " \n " ;
msg + = " This is normally caused by connecting to an " ;
msg + = " OLD version of Retroshare " ;
msg + = " \n " ;
rs_sprintf_append ( msg , " (M:%d B:%d E:%d) \n " , maxlen , blen , extralen ) ;
msg + = " \n " ;
msg + = " block = " ;
msg + = RsUtil : : BinToHex ( ( char * ) block , 8 ) ;
msg + = " \n " ;
msg + = " Please get your friends to upgrade to the latest version " ;
msg + = " \n " ;
msg + = " \n " ;
msg + = " If you are sure the error was not caused by an old version " ;
msg + = " \n " ;
msg + = " Please report the problem to Retroshare's developers " ;
msg + = " \n " ;
notify - > AddLogMessage ( 0 , RS_SYS_WARNING , title , msg ) ;
std : : cerr < < " pqistreamer::handle_incoming() ERROR: Read Packet too Big " < < std : : endl ;
std : : cerr < < msg ;
std : : cerr < < std : : endl ;
}
mBio - > close ( ) ;
mReading_state = reading_state_initial ; // restart at state 1.
mFailed_read_attempts = 0 ;
return - 1 ;
// Used to exit now! exit(1);
}
if ( extralen > 0 )
{
void * extradata = ( void * ) ( ( ( char * ) block ) + blen ) ;
int tmplen ;
// Don't reset the block now! If pqissl is in the middle of a multiple-chunk
// packet (larger than 16384 bytes), and pqistreamer jumped directly yo
// continue_packet:, then readdata is going to write after the beginning of
// extradata, yet not exactly at start -> the start of the packet would be wiped out.
//
// so, don't do that:
// memset( extradata,0,extralen ) ;
if ( extralen ! = ( tmplen = mBio - > readdata ( extradata , extralen ) ) )
{
2009-07-21 20:57:18 +00:00
# ifdef DEBUG_PQISTREAMER
2016-04-25 22:50:41 -04:00
if ( tmplen > 0 )
std : : cerr < < " [ " < < ( void * ) pthread_self ( ) < < " ] " < < " Incomplete packet read ! This is a real problem ;-) " < < std : : endl ;
2009-07-21 20:57:18 +00:00
# endif
2009-03-09 14:10:24 +00:00
2016-04-25 22:50:41 -04:00
if ( + + mFailed_read_attempts > max_failed_read_attempts )
{
std : : string out ;
rs_sprintf ( out , " Error Completing Read (read %d/%d) " , tmplen , extralen ) ;
std : : cerr < < out < < std : : endl ;
pqioutput ( PQL_ALERT , pqistreamerzone , out ) ;
p3Notify * notify = RsServer : : notify ( ) ;
if ( notify )
{
std : : string title = " Warning: Error Completing Read " ;
std : : string msgout ;
msgout = " **** WARNING **** \n " ;
msgout + = " Retroshare has experienced an unexpected Read ERROR " ;
msgout + = " \n " ;
rs_sprintf_append ( msgout , " (M:%d B:%d E:%d R:%d) \n " , maxlen , blen , extralen , tmplen ) ;
msgout + = " \n " ;
msgout + = " Note: this error might as well happen (rarely) when a peer disconnects in between a transmission of a large packet. \n " ;
msgout + = " If it happens manny time, please contact the developers, and send them these numbers: " ;
msgout + = " \n " ;
msgout + = " block = " ;
msgout + = RsUtil : : BinToHex ( ( char * ) block , 8 ) + " \n " ;
std : : cerr < < msgout < < std : : endl ;
}
mBio - > close ( ) ;
mReading_state = reading_state_initial ; // restart at state 1.
mFailed_read_attempts = 0 ;
return - 1 ;
}
else
{
2009-07-21 20:14:31 +00:00
# ifdef DEBUG_PQISTREAMER
2016-04-25 22:50:41 -04:00
std : : cerr < < " [ " < < ( void * ) pthread_self ( ) < < " ] " < < " given up 5, state = " < < mReading_state < < std : : endl ;
2009-07-21 20:14:31 +00:00
# endif
2016-04-25 22:50:41 -04:00
return 0 ; // this is just a SSL_WANT_READ error. Don't panic, we'll re-try the read soon.
// we assume readdata() returned either -1 or the complete read size.
}
}
2009-07-21 20:14:31 +00:00
# ifdef DEBUG_PQISTREAMER
2016-04-25 22:50:41 -04:00
std : : cerr < < " [ " < < ( void * ) pthread_self ( ) < < " ] " < < " continuing packet state= " < < mReading_state < < std : : endl ;
std : : cerr < < " [ " < < ( void * ) pthread_self ( ) < < " ] " < < " block 2 : " < < RsUtil : : BinToHex ( extradata , 8 ) < < std : : endl ;
2009-07-21 20:14:31 +00:00
# endif
2009-03-03 19:40:42 +00:00
2016-04-25 22:50:41 -04:00
mFailed_read_attempts = 0 ;
readbytes + = extralen ;
}
2009-03-03 19:40:42 +00:00
2016-04-25 22:50:41 -04:00
// create packet, based on header.
2012-09-17 20:25:22 +00:00
# ifdef DEBUG_PQISTREAMER
2016-04-25 22:50:41 -04:00
{
std : : string out ;
rs_sprintf ( out , " Read Data Block -> Incoming Pkt(%d) " , blen + extralen ) ;
//std::cerr << out ;
pqioutput ( PQL_DEBUG_BASIC , pqistreamerzone , out ) ;
}
2012-09-17 20:25:22 +00:00
# endif
2009-03-03 19:40:42 +00:00
2016-04-25 22:50:41 -04:00
uint32_t pktlen = blen + extralen ;
2009-07-21 20:14:31 +00:00
# ifdef DEBUG_PQISTREAMER
2016-04-26 21:23:19 -04:00
std : : cerr < < " [ " < < ( void * ) pthread_self ( ) < < " ] " < < RsUtil : : BinToHex ( ( char * ) block , 8 ) < < " ...: deserializing. Size= " < < pktlen < < std : : endl ;
2009-07-21 20:14:31 +00:00
# endif
2016-04-25 22:50:41 -04:00
RsItem * pkt ;
if ( is_partial_packet )
{
2016-04-26 21:23:19 -04:00
# ifdef DEBUG_PACKET_SLICING
2016-04-25 22:50:41 -04:00
std : : cerr < < " Inputing partial packet " < < RsUtil : : BinToHex ( ( char * ) block , 8 ) < < std : : endl ;
2016-04-26 21:23:19 -04:00
# endif
2016-04-25 23:37:02 -04:00
pkt = addPartialPacket ( block , pktlen , slice_packet_id , is_packet_starting , is_packet_ending ) ;
2016-04-25 22:50:41 -04:00
}
else
pkt = mRsSerialiser - > deserialise ( block , & pktlen ) ;
if ( ( pkt ! = NULL ) & & ( 0 < handleincomingitem_locked ( pkt , pktlen ) ) )
{
2012-09-17 20:25:22 +00:00
# ifdef DEBUG_PQISTREAMER
2016-04-25 22:50:41 -04:00
pqioutput ( PQL_DEBUG_BASIC , pqistreamerzone , " Successfully Read a Packet! " ) ;
2012-09-17 20:25:22 +00:00
# endif
2016-04-25 22:50:41 -04:00
inReadBytes_locked ( pktlen ) ; // only count deserialised packets, because that's what is actually been transfered.
}
else if ( ! is_partial_packet )
{
2012-09-17 20:25:22 +00:00
# ifdef DEBUG_PQISTREAMER
2016-04-25 22:50:41 -04:00
pqioutput ( PQL_ALERT , pqistreamerzone , " Failed to handle Packet! " ) ;
2012-09-17 20:25:22 +00:00
# endif
2016-04-25 22:50:41 -04:00
std : : cerr < < " Incoming Packet could not be deserialised: " < < std : : endl ;
std : : cerr < < " Incoming peer id: " < < PeerId ( ) < < std : : endl ;
if ( pktlen > = 8 )
std : : cerr < < " Packet header : " < < RsUtil : : BinToHex ( ( unsigned char * ) block , 8 ) < < std : : endl ;
if ( pktlen > 8 )
std : : cerr < < " Packet data : " < < RsUtil : : BinToHex ( ( unsigned char * ) block + 8 , std : : min ( 50u , pktlen - 8 ) ) < < ( ( pktlen > 58 ) ? " ... " : " " ) < < std : : endl ;
}
2009-03-03 19:40:42 +00:00
2016-04-25 22:50:41 -04:00
mReading_state = reading_state_initial ; // restart at state 1.
mFailed_read_attempts = 0 ; // reset failed read, as the packet has been totally read.
}
2009-03-03 19:40:42 +00:00
2016-04-25 22:50:41 -04:00
if ( maxin > readbytes & & mBio - > moretoread ( 0 ) )
goto start_packet_read ;
2009-03-03 19:40:42 +00:00
2010-11-13 11:46:17 +00:00
# ifdef DEBUG_TRANSFERS
2016-04-25 22:50:41 -04:00
if ( readbytes > = maxin )
{
std : : cerr < < " pqistreamer::handleincoming() Stopped reading as readbytes >= maxin. Read " < < readbytes < < " bytes " ;
std : : cerr < < std : : endl ;
}
2010-11-11 23:59:04 +00:00
# endif
2016-04-25 22:50:41 -04:00
return 0 ;
2009-03-03 19:40:42 +00:00
}
2007-11-15 03:18:48 +00:00
2016-04-25 23:37:02 -04:00
RsItem * pqistreamer : : addPartialPacket ( const void * block , uint32_t len , uint32_t slice_packet_id , bool is_packet_starting , bool is_packet_ending )
2016-04-23 17:10:25 -04:00
{
2016-04-26 21:23:19 -04:00
# ifdef DEBUG_PACKET_SLICING
2016-04-25 23:37:02 -04:00
std : : cerr < < " Receiving partial packet. size= " < < len < < " , ID= " < < std : : hex < < slice_packet_id < < std : : dec < < " , starting: " < < is_packet_starting < < " , ending: " < < is_packet_ending ;
2016-04-26 21:23:19 -04:00
# endif
2016-04-24 21:18:44 -04:00
2016-04-24 13:43:34 -04:00
if ( is_packet_starting & & is_packet_ending )
{
2016-04-25 23:37:02 -04:00
std : : cerr < < " (EE) unexpected situation: both starting and ending " < < std : : endl ;
2016-04-24 13:43:34 -04:00
return NULL ;
}
2016-04-24 21:18:44 -04:00
uint32_t slice_length = len - PQISTREAM_PARTIAL_PACKET_HEADER_SIZE ;
unsigned char * slice_data = & ( ( unsigned char * ) block ) [ PQISTREAM_PARTIAL_PACKET_HEADER_SIZE ] ;
2016-04-23 17:10:25 -04:00
std : : map < uint32_t , PartialPacketRecord > : : iterator it = mPartialPackets . find ( slice_packet_id ) ;
2016-04-24 13:43:34 -04:00
2016-04-23 17:10:25 -04:00
if ( it = = mPartialPackets . end ( ) )
{
2016-04-24 13:43:34 -04:00
// make sure we really have a starting packet. Otherwise this is an error.
if ( ! is_packet_starting )
{
2016-04-25 23:37:02 -04:00
std : : cerr < < " (EE) non starting packet has no record. Dropping " < < std : : endl ;
2016-04-24 13:43:34 -04:00
return NULL ;
}
2016-04-24 21:18:44 -04:00
PartialPacketRecord & rec = mPartialPackets [ slice_packet_id ] ;
2016-04-24 13:43:34 -04:00
2016-04-24 21:18:44 -04:00
rec . mem = rs_malloc ( slice_length ) ;
if ( ! rec . mem )
{
2016-04-25 23:37:02 -04:00
std : : cerr < < " (EE) Cannot allocate memory for slice of size " < < slice_length < < std : : endl ;
2016-04-24 21:18:44 -04:00
return NULL ;
}
memcpy ( rec . mem , slice_data , slice_length ) ; ;
rec . size = slice_length ;
2016-04-26 21:23:19 -04:00
# ifdef DEBUG_PACKET_SLICING
2016-04-25 23:37:02 -04:00
std : : cerr < < " => stored in new record (size= " < < rec . size < < std : : endl ;
2016-04-26 21:23:19 -04:00
# endif
2016-04-24 13:43:34 -04:00
return NULL ; // no need to check for ending
2016-04-23 17:10:25 -04:00
}
else
{
2016-04-24 21:18:44 -04:00
PartialPacketRecord & rec = it - > second ;
2016-04-24 13:43:34 -04:00
if ( is_packet_starting )
{
std : : cerr < < " (WW) dropping unfinished existing packet that gets to be replaced by new starting packet. " < < std : : endl ;
2016-04-24 21:18:44 -04:00
free ( rec . mem ) ;
rec . size = 0 ;
2016-04-24 13:43:34 -04:00
}
// make sure this is a continuing packet, otherwise this is an error.
2016-04-24 21:18:44 -04:00
rec . mem = realloc ( rec . mem , rec . size + slice_length ) ;
memcpy ( & ( ( char * ) rec . mem ) [ rec . size ] , slice_data , slice_length ) ;
rec . size + = slice_length ;
2016-04-26 21:23:19 -04:00
# ifdef DEBUG_PACKET_SLICING
2016-04-25 23:37:02 -04:00
std : : cerr < < " => added to existing record size= " < < rec . size ;
2016-04-26 21:23:19 -04:00
# endif
2016-04-24 13:43:34 -04:00
if ( is_packet_ending )
{
2016-04-26 21:23:19 -04:00
# ifdef DEBUG_PACKET_SLICING
2016-04-25 23:37:02 -04:00
std : : cerr < < " => deserialising: mem= " < < RsUtil : : BinToHex ( ( char * ) rec . mem , std : : min ( 8u , rec . size ) ) < < std : : endl ;
2016-04-26 21:23:19 -04:00
# endif
2016-04-24 21:18:44 -04:00
RsItem * item = mRsSerialiser - > deserialise ( rec . mem , & rec . size ) ;
2016-04-24 13:43:34 -04:00
2016-04-24 21:18:44 -04:00
free ( rec . mem ) ;
2016-04-24 13:43:34 -04:00
mPartialPackets . erase ( it ) ;
return item ;
}
else
2016-04-24 21:18:44 -04:00
{
2016-04-26 21:23:19 -04:00
# ifdef DEBUG_PACKET_SLICING
2016-04-24 21:18:44 -04:00
std : : cerr < < std : : endl ;
2016-04-26 21:23:19 -04:00
# endif
2016-04-24 13:43:34 -04:00
return NULL ;
2016-04-24 21:18:44 -04:00
}
2016-04-23 17:10:25 -04:00
}
}
2007-11-15 03:18:48 +00:00
/* BandWidth Management Assistance */
2013-10-01 10:11:34 +00:00
float pqistreamer : : outTimeSlice_locked ( )
2007-11-15 03:18:48 +00:00
{
2012-09-17 20:25:22 +00:00
# ifdef DEBUG_PQISTREAMER
2012-04-14 22:38:24 +00:00
pqioutput ( PQL_DEBUG_ALL , pqistreamerzone , " pqistreamer::outTimeSlice() " ) ;
2012-09-17 20:25:22 +00:00
# endif
2007-11-15 03:18:48 +00:00
2007-12-12 01:29:14 +00:00
//fixme("pqistreamer::outTimeSlice()", 1);
2007-11-15 03:18:48 +00:00
return 1 ;
}
// very simple.....
2013-10-01 10:11:34 +00:00
int pqistreamer : : outAllowedBytes_locked ( )
2007-11-15 03:18:48 +00:00
{
int t = time ( NULL ) ; // get current timestep.
/* allow a lot if not bandwidthLimited */
2013-10-01 10:11:34 +00:00
if ( ! mBio - > bandwidthLimited ( ) )
2007-11-15 03:18:48 +00:00
{
2013-10-01 10:11:34 +00:00
mCurrSent = 0 ;
mCurrSentTS = t ;
2007-11-15 03:18:48 +00:00
return PQISTREAM_ABS_MAX ;
}
2013-10-01 10:11:34 +00:00
int dt = t - mCurrSentTS ;
2007-11-15 03:18:48 +00:00
// limiter -> for when currSentTs -> 0.
if ( dt > 5 )
dt = 5 ;
int maxout = ( int ) ( getMaxRate ( false ) * 1000.0 ) ;
2013-10-01 10:11:34 +00:00
mCurrSent - = dt * maxout ;
if ( mCurrSent < 0 )
2007-11-15 03:18:48 +00:00
{
2013-10-01 10:11:34 +00:00
mCurrSent = 0 ;
2007-11-15 03:18:48 +00:00
}
2013-10-01 10:11:34 +00:00
mCurrSentTS = t ;
2007-11-15 03:18:48 +00:00
2012-09-17 20:25:22 +00:00
# ifdef DEBUG_PQISTREAMER
2012-04-14 22:38:24 +00:00
{
std : : string out ;
2013-10-01 10:11:34 +00:00
rs_sprintf ( out , " pqistreamer::outAllowedBytes() is %d/%d " , maxout - mCurrSent , maxout ) ;
2012-04-14 22:38:24 +00:00
pqioutput ( PQL_DEBUG_ALL , pqistreamerzone , out ) ;
2007-11-15 03:18:48 +00:00
}
2012-09-17 20:25:22 +00:00
# endif
2007-11-15 03:18:48 +00:00
2013-10-01 10:11:34 +00:00
return maxout - mCurrSent ;
2007-11-15 03:18:48 +00:00
}
2013-10-01 10:11:34 +00:00
int pqistreamer : : inAllowedBytes_locked ( )
2007-11-15 03:18:48 +00:00
{
int t = time ( NULL ) ; // get current timestep.
/* allow a lot if not bandwidthLimited */
2013-10-01 10:11:34 +00:00
if ( ! mBio - > bandwidthLimited ( ) )
2007-11-15 03:18:48 +00:00
{
2013-10-01 10:11:34 +00:00
mCurrReadTS = t ;
mCurrRead = 0 ;
2007-11-15 03:18:48 +00:00
return PQISTREAM_ABS_MAX ;
}
2013-10-01 10:11:34 +00:00
int dt = t - mCurrReadTS ;
2007-11-15 03:18:48 +00:00
// limiter -> for when currReadTs -> 0.
if ( dt > 5 )
dt = 5 ;
int maxin = ( int ) ( getMaxRate ( true ) * 1000.0 ) ;
2013-10-01 10:11:34 +00:00
mCurrRead - = dt * maxin ;
if ( mCurrRead < 0 )
2007-11-15 03:18:48 +00:00
{
2013-10-01 10:11:34 +00:00
mCurrRead = 0 ;
2007-11-15 03:18:48 +00:00
}
2013-10-01 10:11:34 +00:00
mCurrReadTS = t ;
2007-11-15 03:18:48 +00:00
2012-09-17 20:25:22 +00:00
# ifdef DEBUG_PQISTREAMER
2012-04-14 22:38:24 +00:00
{
std : : string out ;
2013-10-01 10:11:34 +00:00
rs_sprintf ( out , " pqistreamer::inAllowedBytes() is %d/%d " , maxin - mCurrRead , maxin ) ;
2012-04-14 22:38:24 +00:00
pqioutput ( PQL_DEBUG_ALL , pqistreamerzone , out ) ;
2007-11-15 03:18:48 +00:00
}
2012-09-17 20:25:22 +00:00
# endif
2007-11-15 03:18:48 +00:00
2013-10-01 10:11:34 +00:00
return maxin - mCurrRead ;
2007-11-15 03:18:48 +00:00
}
2016-03-02 19:00:51 -05:00
void pqistreamer : : outSentBytes_locked ( uint32_t outb )
2007-11-15 03:18:48 +00:00
{
2012-09-17 20:25:22 +00:00
# ifdef DEBUG_PQISTREAMER
2012-04-14 22:38:24 +00:00
{
std : : string out ;
rs_sprintf ( out , " pqistreamer::outSentBytes(): %d@%gkB/s " , outb , getRate ( false ) ) ;
pqioutput ( PQL_DEBUG_ALL , pqistreamerzone , out ) ;
2007-11-15 03:18:48 +00:00
}
2012-09-17 20:25:22 +00:00
# endif
2007-11-15 03:18:48 +00:00
2012-06-21 23:23:46 +00:00
/*** One theory for the massive delays - is that the queue here is filling up ****/
//#define DEBUG_LAG 1
# ifdef DEBUG_LAG
# define MIN_PKTS_FOR_MSG 100
2012-06-23 12:10:41 +00:00
if ( out_queue_size ( ) > MIN_PKTS_FOR_MSG )
2012-06-21 23:23:46 +00:00
{
std : : cerr < < " pqistreamer::outSentBytes() for: " < < PeerId ( ) ;
2012-06-23 12:10:41 +00:00
std : : cerr < < " End of Write and still " < < out_queue_size ( ) < < " pkts left " ;
2012-06-21 23:23:46 +00:00
std : : cerr < < std : : endl ;
}
# endif
2013-10-01 10:11:34 +00:00
mTotalSent + = outb ;
mCurrSent + = outb ;
mAvgSentCount + = outb ;
2007-11-15 03:18:48 +00:00
return ;
}
2016-03-02 19:00:51 -05:00
void pqistreamer : : inReadBytes_locked ( uint32_t inb )
2007-11-15 03:18:48 +00:00
{
2012-09-17 20:25:22 +00:00
# ifdef DEBUG_PQISTREAMER
2012-04-14 22:38:24 +00:00
{
std : : string out ;
rs_sprintf ( out , " pqistreamer::inReadBytes(): %d@%gkB/s " , inb , getRate ( true ) ) ;
pqioutput ( PQL_DEBUG_ALL , pqistreamerzone , out ) ;
2007-11-15 03:18:48 +00:00
}
2012-09-17 20:25:22 +00:00
# endif
2007-11-15 03:18:48 +00:00
2013-10-01 10:11:34 +00:00
mTotalRead + = inb ;
mCurrRead + = inb ;
mAvgReadCount + = inb ;
2007-11-15 03:18:48 +00:00
return ;
}
2015-04-04 09:58:53 +00:00
void pqistreamer : : allocate_rpend_locked ( )
{
if ( mPkt_rpending )
return ;
mPkt_rpend_size = getRsPktMaxSize ( ) ;
2016-01-12 21:43:04 -05:00
mPkt_rpending = rs_malloc ( mPkt_rpend_size ) ;
2016-01-12 21:10:11 -05:00
if ( mPkt_rpending = = NULL )
return ;
2015-04-04 09:58:53 +00:00
// avoid uninitialized (and random) memory read.
memset ( mPkt_rpending , 0 , mPkt_rpend_size ) ;
}
void pqistreamer : : free_rpend_locked ( )
{
if ( ! mPkt_rpending )
return ;
free ( mPkt_rpending ) ;
mPkt_rpending = 0 ;
mPkt_rpend_size = 0 ;
}
2015-07-12 04:04:18 +00:00
int pqistreamer : : gatherStatistics ( std : : list < RSTrafficClue > & outqueue_lst , std : : list < RSTrafficClue > & inqueue_lst )
{
RsStackMutex stack ( mStreamerMtx ) ; /**** LOCKED MUTEX ****/
return locked_gatherStatistics ( outqueue_lst , inqueue_lst ) ;
}
2012-06-22 01:35:32 +00:00
int pqistreamer : : getQueueSize ( bool in )
{
2013-10-01 10:11:34 +00:00
RsStackMutex stack ( mStreamerMtx ) ; /**** LOCKED MUTEX ****/
2012-06-22 01:35:32 +00:00
if ( in )
2014-10-31 21:24:42 +00:00
return mIncomingSize ;
else
return locked_out_queue_size ( ) ;
2012-06-22 01:35:32 +00:00
}
void pqistreamer : : getRates ( RsBwRates & rates )
{
RateInterface : : getRates ( rates ) ;
2013-10-01 10:11:34 +00:00
RsStackMutex stack ( mStreamerMtx ) ; /**** LOCKED MUTEX ****/
2014-10-31 21:24:42 +00:00
rates . mQueueIn = mIncomingSize ;
2013-10-01 10:11:34 +00:00
rates . mQueueOut = locked_out_queue_size ( ) ;
2012-06-22 01:35:32 +00:00
}
2013-10-01 10:11:34 +00:00
int pqistreamer : : locked_out_queue_size ( ) const
2012-06-23 12:10:41 +00:00
{
// Warning: because out_pkt is a list, calling size
// is O(n) ! Makign algorithms pretty inefficient. We should record how many
// items get stored and discarded to have a proper size value at any time
//
2013-10-01 10:11:34 +00:00
return mOutPkts . size ( ) ;
2012-06-23 12:10:41 +00:00
}
void pqistreamer : : locked_clear_out_queue ( )
{
2013-10-01 10:11:34 +00:00
for ( std : : list < void * > : : iterator it = mOutPkts . begin ( ) ; it ! = mOutPkts . end ( ) ; )
2012-06-23 12:10:41 +00:00
{
free ( * it ) ;
2013-10-01 10:11:34 +00:00
it = mOutPkts . erase ( it ) ;
2012-06-23 12:10:41 +00:00
# ifdef DEBUG_PQISTREAMER
2013-10-01 10:11:34 +00:00
std : : string out = " pqistreamer::locked_clear_out_queue() Not active -> Clearing Pkt! " ;
std : : cerr < < out < < std : : endl ;
2012-06-23 12:10:41 +00:00
pqioutput ( PQL_DEBUG_BASIC , pqistreamerzone , out ) ;
# endif
}
}
int pqistreamer : : locked_compute_out_pkt_size ( ) const
{
int total = 0 ;
2013-10-01 10:11:34 +00:00
for ( std : : list < void * > : : const_iterator it = mOutPkts . begin ( ) ; it ! = mOutPkts . end ( ) ; + + it )
2012-06-23 12:10:41 +00:00
total + = getRsItemSize ( * it ) ;
2012-06-22 01:35:32 +00:00
2012-06-23 12:10:41 +00:00
return total ;
}
2015-07-12 04:04:18 +00:00
int pqistreamer : : locked_gatherStatistics ( std : : list < RSTrafficClue > & out_lst , std : : list < RSTrafficClue > & in_lst )
2014-10-31 21:24:42 +00:00
{
2015-07-13 03:04:36 +00:00
out_lst = mPreviousStatsChunk_Out ;
in_lst = mPreviousStatsChunk_In ;
2014-10-31 21:24:42 +00:00
return 1 ;
}
2016-04-25 23:37:02 -04:00
void * pqistreamer : : locked_pop_out_data ( uint32_t max_slice_size , uint32_t & size , bool & starts , bool & ends , uint32_t & packet_id )
2012-06-23 12:10:41 +00:00
{
2016-04-23 17:10:25 -04:00
size = 0 ;
starts = true ;
ends = true ;
packet_id = 0 ;
2012-06-23 12:10:41 +00:00
void * res = NULL ;
2013-10-01 10:11:34 +00:00
if ( ! mOutPkts . empty ( ) )
2012-06-23 12:10:41 +00:00
{
2013-10-01 10:11:34 +00:00
res = * ( mOutPkts . begin ( ) ) ;
mOutPkts . pop_front ( ) ;
2012-06-23 12:10:41 +00:00
# ifdef DEBUG_TRANSFERS
2013-10-01 10:11:34 +00:00
std : : cerr < < " pqistreamer::locked_pop_out_data() getting next pkt from mOutPkts queue " ;
2012-06-23 12:10:41 +00:00
std : : cerr < < std : : endl ;
# endif
}
return res ;
}
2016-04-23 17:10:25 -04:00