mirror of
https://github.com/RetroShare/RetroShare.git
synced 2024-10-01 02:35:48 -04:00
improved/simplified slicing protocol
This commit is contained in:
parent
dd81ce3bf3
commit
3b685851d2
@ -86,20 +86,7 @@ void pqiQoS::in_rsItem(void *ptr,int size,int priority)
|
|||||||
// }
|
// }
|
||||||
|
|
||||||
|
|
||||||
void *pqiQoS::out_rsItem()
|
void *pqiQoS::out_rsItem(uint32_t max_slice_size, uint32_t& size, bool& starts, bool& ends, uint32_t& packet_id)
|
||||||
{
|
|
||||||
bool starts,ends ;
|
|
||||||
uint32_t packet_id,offset,size ;
|
|
||||||
|
|
||||||
void *res = out_rsItem(~0u,16,offset,size,starts,ends,packet_id) ;
|
|
||||||
|
|
||||||
if(!starts || !ends)
|
|
||||||
std::cerr << "(EE) protocol error in pqiQoS. Will eventually kill connection!" << std::endl;
|
|
||||||
|
|
||||||
return res ;
|
|
||||||
}
|
|
||||||
|
|
||||||
void *pqiQoS::out_rsItem(uint32_t max_slice_size,uint32_t offset_unit,uint32_t& offset,uint32_t& size,bool& starts,bool& ends,uint32_t& packet_id)
|
|
||||||
{
|
{
|
||||||
// Go through the queues. Increment counters.
|
// Go through the queues. Increment counters.
|
||||||
|
|
||||||
@ -127,16 +114,11 @@ void *pqiQoS::out_rsItem(uint32_t max_slice_size,uint32_t offset_unit,uint32_t&
|
|||||||
|
|
||||||
// now chop a slice of this item
|
// now chop a slice of this item
|
||||||
|
|
||||||
void *res = _item_queues[last].slice(max_slice_size,offset,size,starts,ends,packet_id) ;
|
void *res = _item_queues[last].slice(max_slice_size,size,starts,ends,packet_id) ;
|
||||||
|
|
||||||
if(ends)
|
if(ends)
|
||||||
--_nb_items ;
|
--_nb_items ;
|
||||||
|
|
||||||
if( (offset % offset_unit) != 0)
|
|
||||||
std::cerr << "(EE) Severe error in pqiQoS::out_rsItem(). offset unit inconsistent with calculated offset." << std::endl;
|
|
||||||
|
|
||||||
offset /= offset_unit ;
|
|
||||||
|
|
||||||
return res ;
|
return res ;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -75,7 +75,7 @@ public:
|
|||||||
return item ;
|
return item ;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *slice(uint32_t max_size,uint32_t& offset,uint32_t& size,bool& starts,bool& ends,uint32_t& packet_id)
|
void *slice(uint32_t max_size,uint32_t& size,bool& starts,bool& ends,uint32_t& packet_id)
|
||||||
{
|
{
|
||||||
if(_items.empty())
|
if(_items.empty())
|
||||||
return NULL ;
|
return NULL ;
|
||||||
@ -87,7 +87,6 @@ public:
|
|||||||
|
|
||||||
if(rec.current_offset == 0 && rec.size < max_size)
|
if(rec.current_offset == 0 && rec.size < max_size)
|
||||||
{
|
{
|
||||||
offset = 0 ;
|
|
||||||
starts = true ;
|
starts = true ;
|
||||||
ends = true ;
|
ends = true ;
|
||||||
size = rec.size ;
|
size = rec.size ;
|
||||||
@ -151,8 +150,7 @@ public:
|
|||||||
|
|
||||||
// This function pops items from the queue, y order of priority
|
// This function pops items from the queue, y order of priority
|
||||||
//
|
//
|
||||||
void *out_rsItem() ;
|
void *out_rsItem(uint32_t max_slice_size,uint32_t& size,bool& starts,bool& ends,uint32_t& packet_id) ;
|
||||||
void *out_rsItem(uint32_t max_slice_size,uint32_t offset_unit,uint32_t& offset,uint32_t& size,bool& starts,bool& ends,uint32_t& packet_id) ;
|
|
||||||
|
|
||||||
// This function is used to queue items.
|
// This function is used to queue items.
|
||||||
//
|
//
|
||||||
|
@ -65,13 +65,15 @@ void pqiQoSstreamer::locked_clear_out_queue()
|
|||||||
_total_item_count = 0 ;
|
_total_item_count = 0 ;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *pqiQoSstreamer::locked_pop_out_data(uint32_t max_slice_size,uint32_t offset_unit,uint32_t& offset,uint32_t& size,bool& starts,bool& ends,uint32_t& packet_id)
|
void *pqiQoSstreamer::locked_pop_out_data(uint32_t max_slice_size, uint32_t& size, bool& starts, bool& ends, uint32_t& packet_id)
|
||||||
{
|
{
|
||||||
void *out = pqiQoS::out_rsItem(max_slice_size,offset_unit,offset,size,starts,ends,packet_id) ;
|
void *out = pqiQoS::out_rsItem(max_slice_size,size,starts,ends,packet_id) ;
|
||||||
|
|
||||||
if(out != NULL)
|
if(out != NULL)
|
||||||
{
|
{
|
||||||
_total_item_size -= getRsItemSize(out) ;
|
_total_item_size -= getRsItemSize(out) ;
|
||||||
|
|
||||||
|
if(ends)
|
||||||
--_total_item_count ;
|
--_total_item_count ;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -40,7 +40,7 @@ class pqiQoSstreamer: public pqithreadstreamer, public pqiQoS
|
|||||||
virtual int locked_out_queue_size() const { return _total_item_count ; }
|
virtual int locked_out_queue_size() const { return _total_item_count ; }
|
||||||
virtual void locked_clear_out_queue() ;
|
virtual void locked_clear_out_queue() ;
|
||||||
virtual int locked_compute_out_pkt_size() const { return _total_item_size ; }
|
virtual int locked_compute_out_pkt_size() const { return _total_item_size ; }
|
||||||
virtual void *locked_pop_out_data(uint32_t max_slice_size,uint32_t offset_unit,uint32_t& offset,uint32_t& size,bool& starts,bool& ends,uint32_t& packet_id);
|
virtual void *locked_pop_out_data(uint32_t max_slice_size,uint32_t& size,bool& starts,bool& ends,uint32_t& packet_id);
|
||||||
//virtual int locked_gatherStatistics(std::vector<uint32_t>& per_service_count,std::vector<uint32_t>& per_priority_count) const; // extracting data.
|
//virtual int locked_gatherStatistics(std::vector<uint32_t>& per_service_count,std::vector<uint32_t>& per_priority_count) const; // extracting data.
|
||||||
|
|
||||||
|
|
||||||
|
@ -44,10 +44,9 @@ static const int PQISTREAM_AVG_PERIOD = 5; // update speed estimate every
|
|||||||
static const float PQISTREAM_AVG_FRAC = 0.8; // for bandpass filter over speed estimate.
|
static const float PQISTREAM_AVG_FRAC = 0.8; // for bandpass filter over speed estimate.
|
||||||
static const int PQISTREAM_OPTIMAL_PACKET_SIZE = 512; // It is believed that this value should be lower than TCP slices and large enough as compare to encryption padding.
|
static const int PQISTREAM_OPTIMAL_PACKET_SIZE = 512; // It is believed that this value should be lower than TCP slices and large enough as compare to encryption padding.
|
||||||
// most importantly, it should be constant, so as to allow correct QoS.
|
// most importantly, it should be constant, so as to allow correct QoS.
|
||||||
static const int PQISTREAM_OPTIMAL_SLICE_OFFSET_UNIT = 16 ; // slices offset in units of 16 bits. That allows bigger numbers encoded in 4 less bits.
|
|
||||||
static const int PQISTREAM_SLICE_FLAG_STARTS = 0x01; //
|
static const int PQISTREAM_SLICE_FLAG_STARTS = 0x01; //
|
||||||
static const int PQISTREAM_SLICE_FLAG_ENDS = 0x02; // these flags should be kept in the range 0x01-0x08
|
static const int PQISTREAM_SLICE_FLAG_ENDS = 0x02; // these flags should be kept in the range 0x01-0x08
|
||||||
static const int PQISTREAM_SLICE_PROTOCOL_VERSION_ID = 0x10; // Protocol version ID. Should hold on the 4 lower bits.
|
static const int PQISTREAM_SLICE_PROTOCOL_VERSION_ID_01 = 0x10; // Protocol version ID. Should hold on the 4 lower bits.
|
||||||
static const int PQISTREAM_PARTIAL_PACKET_HEADER_SIZE = 8; // Same size than normal header, to make the code simpler.
|
static const int PQISTREAM_PARTIAL_PACKET_HEADER_SIZE = 8; // Same size than normal header, to make the code simpler.
|
||||||
|
|
||||||
/* This removes the print statements (which hammer pqidebug) */
|
/* This removes the print statements (which hammer pqidebug) */
|
||||||
@ -411,7 +410,7 @@ time_t pqistreamer::getLastIncomingTS()
|
|||||||
//
|
//
|
||||||
// Old : 02 0014 03 00000026 [data, 26 bytes] => [version 1B] [service 2B][subpacket 1B] [size 4B]
|
// Old : 02 0014 03 00000026 [data, 26 bytes] => [version 1B] [service 2B][subpacket 1B] [size 4B]
|
||||||
// New1: fv 0014 03 xxxxx sss [data, sss bytes] => [flags 0.5B version 0.5B] [service 2B][subpacket 1B] [packet counter 2.5B size 1.5B]
|
// New1: fv 0014 03 xxxxx sss [data, sss bytes] => [flags 0.5B version 0.5B] [service 2B][subpacket 1B] [packet counter 2.5B size 1.5B]
|
||||||
// New2: fx xxxxxx oooo ssss [data, sss bytes] => [flags 0.5B] [2^28 packet count] [2^16 offset (in units of 16)] [size 2^16]
|
// New2: pp ff xxxxxxxx ssss [data, sss bytes] => [flags 1B] [protocol version 1B] [2^32 packet count] [2^16 size]
|
||||||
//
|
//
|
||||||
// Flags: 0x1 => incomplete packet continued after
|
// Flags: 0x1 => incomplete packet continued after
|
||||||
// Flags: 0x2 => packet ending a previously incomplete packet
|
// Flags: 0x2 => packet ending a previously incomplete packet
|
||||||
@ -424,9 +423,9 @@ time_t pqistreamer::getLastIncomingTS()
|
|||||||
// - Encode length on 1.5 Bytes (10 bits) => max slice size = 1024
|
// - Encode length on 1.5 Bytes (10 bits) => max slice size = 1024
|
||||||
// - Encode packet ID on 2.5 Bytes (20 bits) => packet counter = [0...1056364]
|
// - Encode packet ID on 2.5 Bytes (20 bits) => packet counter = [0...1056364]
|
||||||
// Mode 2:
|
// Mode 2:
|
||||||
// - Encode flags on 0.5 Bytes ( 4 bits)
|
// - Encode protocol on 1.0 Bytes ( 8 bits)
|
||||||
// - Encode packet ID on 3.5 Bytes (28 bits) => packet counter = [0...16777216]
|
// - Encode flags on 1.0 Bytes ( 8 bits)
|
||||||
// - Encode offset on 2.0 Bytes (16 bits) => 65536 * 16 = // ax packet size = 1048576
|
// - Encode packet ID on 4.0 Bytes (32 bits) => packet counter = [0...2^32]
|
||||||
// - Encode size on 2.0 Bytes (16 bits) => 65536 // max slice size = 65536
|
// - Encode size on 2.0 Bytes (16 bits) => 65536 // max slice size = 65536
|
||||||
//
|
//
|
||||||
// - limit packet grouping to max size 1024.
|
// - limit packet grouping to max size 1024.
|
||||||
@ -498,7 +497,6 @@ int pqistreamer::handleoutgoing_locked()
|
|||||||
mPkt_wpending_size = 0 ;
|
mPkt_wpending_size = 0 ;
|
||||||
int k=0;
|
int k=0;
|
||||||
|
|
||||||
uint32_t slice_offset =0 ;
|
|
||||||
uint32_t slice_size=0;
|
uint32_t slice_size=0;
|
||||||
bool slice_starts=true ;
|
bool slice_starts=true ;
|
||||||
bool slice_ends=true ;
|
bool slice_ends=true ;
|
||||||
@ -506,7 +504,7 @@ int pqistreamer::handleoutgoing_locked()
|
|||||||
|
|
||||||
do
|
do
|
||||||
{
|
{
|
||||||
dta = locked_pop_out_data(PQISTREAM_OPTIMAL_PACKET_SIZE,PQISTREAM_OPTIMAL_SLICE_OFFSET_UNIT,slice_offset,slice_size,slice_starts,slice_ends,slice_packet_id) ;
|
dta = locked_pop_out_data(PQISTREAM_OPTIMAL_PACKET_SIZE,slice_size,slice_starts,slice_ends,slice_packet_id) ;
|
||||||
|
|
||||||
if(!dta)
|
if(!dta)
|
||||||
break ;
|
break ;
|
||||||
@ -518,13 +516,6 @@ int pqistreamer::handleoutgoing_locked()
|
|||||||
mPkt_wpending_size = 0;
|
mPkt_wpending_size = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(slice_offset > 0xfffff || (slice_offset & 0xff)!=0) // 5 f, on purpose. Not a bug.
|
|
||||||
{
|
|
||||||
std::cerr << "(EE) protocol error in pqitreamer: slice size is too large and cannot be encoded." ;
|
|
||||||
free(mPkt_wpending) ;
|
|
||||||
mPkt_wpending_size = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
if(slice_starts && slice_ends) // good old method. Send the packet as is, since it's a full packet.
|
if(slice_starts && slice_ends) // good old method. Send the packet as is, since it's a full packet.
|
||||||
{
|
{
|
||||||
std::cerr << "sending full slice, old style" << std::endl;
|
std::cerr << "sending full slice, old style" << std::endl;
|
||||||
@ -536,24 +527,24 @@ int pqistreamer::handleoutgoing_locked()
|
|||||||
}
|
}
|
||||||
else // partial packet. We make a special header for it and insert it in the stream
|
else // partial packet. We make a special header for it and insert it in the stream
|
||||||
{
|
{
|
||||||
std::cerr << "sending partial slice, packet ID=" << std::hex << slice_packet_id << std::dec << ", offset=" << slice_offset << ", size=" << slice_size << std::endl;
|
std::cerr << "sending partial slice, packet ID=" << std::hex << slice_packet_id << std::dec << ", size=" << slice_size << std::endl;
|
||||||
|
|
||||||
mPkt_wpending = realloc(mPkt_wpending,slice_size+mPkt_wpending_size+PQISTREAM_PARTIAL_PACKET_HEADER_SIZE) ;
|
mPkt_wpending = realloc(mPkt_wpending,slice_size+mPkt_wpending_size+PQISTREAM_PARTIAL_PACKET_HEADER_SIZE) ;
|
||||||
memcpy( &((char*)mPkt_wpending)[mPkt_wpending_size+PQISTREAM_PARTIAL_PACKET_HEADER_SIZE],dta,slice_size) ;
|
memcpy( &((char*)mPkt_wpending)[mPkt_wpending_size+PQISTREAM_PARTIAL_PACKET_HEADER_SIZE],dta,slice_size) ;
|
||||||
free(dta);
|
free(dta);
|
||||||
|
|
||||||
// New2: fp xxxxxx oooo ssss [data, sss bytes] => [flags 0.5B] [protocol version 0.5B] [2^24 packet count] [2^16 offset (in units of 16)] [size 2^16]
|
// New2: pp ff xxxxxxxx ssss [data, sss bytes] => [flags 1B] [protocol version 1B] [2^32 packet count] [2^16 size]
|
||||||
|
|
||||||
uint8_t partial_flags = PQISTREAM_SLICE_PROTOCOL_VERSION_ID ; // includes version. Flags are in the first half-byte
|
uint8_t partial_flags = 0 ;
|
||||||
if(slice_starts) partial_flags |= PQISTREAM_SLICE_FLAG_STARTS ;
|
if(slice_starts) partial_flags |= PQISTREAM_SLICE_FLAG_STARTS ;
|
||||||
if(slice_ends ) partial_flags |= PQISTREAM_SLICE_FLAG_ENDS ;
|
if(slice_ends ) partial_flags |= PQISTREAM_SLICE_FLAG_ENDS ;
|
||||||
|
|
||||||
((char*)mPkt_wpending)[mPkt_wpending_size+0x00] = partial_flags ;
|
((char*)mPkt_wpending)[mPkt_wpending_size+0x00] = PQISTREAM_SLICE_PROTOCOL_VERSION_ID_01 ;
|
||||||
((char*)mPkt_wpending)[mPkt_wpending_size+0x01] = uint8_t(slice_packet_id >> 16) & 0xff ;
|
((char*)mPkt_wpending)[mPkt_wpending_size+0x01] = partial_flags ;
|
||||||
((char*)mPkt_wpending)[mPkt_wpending_size+0x02] = uint8_t(slice_packet_id >> 8) & 0xff ;
|
((char*)mPkt_wpending)[mPkt_wpending_size+0x02] = uint8_t(slice_packet_id >> 24) & 0xff ;
|
||||||
((char*)mPkt_wpending)[mPkt_wpending_size+0x03] = uint8_t(slice_packet_id >> 0) & 0xff ;
|
((char*)mPkt_wpending)[mPkt_wpending_size+0x03] = uint8_t(slice_packet_id >> 16) & 0xff ;
|
||||||
((char*)mPkt_wpending)[mPkt_wpending_size+0x04] = uint8_t(slice_offset >> 12) & 0xff ;
|
((char*)mPkt_wpending)[mPkt_wpending_size+0x04] = uint8_t(slice_packet_id >> 8) & 0xff ;
|
||||||
((char*)mPkt_wpending)[mPkt_wpending_size+0x05] = uint8_t(slice_offset >> 4) & 0xff ; // not a bug. The last 4 bits are discarded because they are always 0
|
((char*)mPkt_wpending)[mPkt_wpending_size+0x05] = uint8_t(slice_packet_id >> 0) & 0xff ;
|
||||||
((char*)mPkt_wpending)[mPkt_wpending_size+0x06] = uint8_t(slice_size >> 8) & 0xff ;
|
((char*)mPkt_wpending)[mPkt_wpending_size+0x06] = uint8_t(slice_size >> 8) & 0xff ;
|
||||||
((char*)mPkt_wpending)[mPkt_wpending_size+0x07] = uint8_t(slice_size >> 0) & 0xff ;
|
((char*)mPkt_wpending)[mPkt_wpending_size+0x07] = uint8_t(slice_size >> 0) & 0xff ;
|
||||||
|
|
||||||
@ -574,9 +565,9 @@ int pqistreamer::handleoutgoing_locked()
|
|||||||
RsScopeTimer tmer("pqistreamer:"+PeerId().toStdString()) ;
|
RsScopeTimer tmer("pqistreamer:"+PeerId().toStdString()) ;
|
||||||
|
|
||||||
// write packet.
|
// write packet.
|
||||||
#ifdef DEBUG_PQISTREAMER
|
//#ifdef DEBUG_PQISTREAMER
|
||||||
std::cout << "Sending Out Pkt of size " << mPkt_wpending_size << " !" << std::endl;
|
std::cout << "Sending Out Pkt of size " << mPkt_wpending_size << " !" << std::endl;
|
||||||
#endif
|
//#endif
|
||||||
int ss=0;
|
int ss=0;
|
||||||
|
|
||||||
if (mPkt_wpending_size != (ss = mBio->senddata(mPkt_wpending, mPkt_wpending_size)))
|
if (mPkt_wpending_size != (ss = mBio->senddata(mPkt_wpending, mPkt_wpending_size)))
|
||||||
@ -718,24 +709,23 @@ continue_packet:
|
|||||||
// workout how much more to read.
|
// workout how much more to read.
|
||||||
|
|
||||||
bool is_partial_packet = false ;
|
bool is_partial_packet = false ;
|
||||||
bool is_packet_starting = (((char*)block)[0] == 0x11) ;
|
bool is_packet_starting = (((char*)block)[1] == PQISTREAM_SLICE_FLAG_STARTS) ;
|
||||||
bool is_packet_ending = (((char*)block)[0] == 0x12) ;
|
bool is_packet_ending = (((char*)block)[1] == PQISTREAM_SLICE_FLAG_ENDS) ;
|
||||||
|
bool is_packet_middle = (((char*)block)[1] == 0x00) ;
|
||||||
|
|
||||||
uint32_t extralen =0;
|
uint32_t extralen =0;
|
||||||
uint32_t slice_offset = 0 ;
|
|
||||||
uint32_t slice_packet_id =0;
|
uint32_t slice_packet_id =0;
|
||||||
|
|
||||||
if( ((char*)block)[0] == 0x10 || ((char*)block)[0] == 0x11 || ((char*)block)[0] == 0x12)
|
if( ((char*)block)[0] == PQISTREAM_SLICE_PROTOCOL_VERSION_ID_01 && ( is_packet_starting || is_packet_middle || is_packet_ending))
|
||||||
{
|
{
|
||||||
extralen = (uint32_t(((uint8_t*)block)[6]) << 8 ) + (uint32_t(((uint8_t*)block)[7]));
|
extralen = (uint32_t(((uint8_t*)block)[6]) << 8 ) + (uint32_t(((uint8_t*)block)[7]));
|
||||||
slice_offset = (uint32_t(((uint8_t*)block)[5]) << 4) + (uint32_t(((uint8_t*)block)[4]) << 12);
|
slice_packet_id = (uint32_t(((uint8_t*)block)[2]) << 24) + (uint32_t(((uint8_t*)block)[3]) << 16) + (uint32_t(((uint8_t*)block)[4]) << 8) + (uint32_t(((uint8_t*)block)[5]) << 0);
|
||||||
slice_packet_id = (uint32_t(((uint8_t*)block)[3]) << 0) + (uint32_t(((uint8_t*)block)[2]) << 8) + (uint32_t(((uint8_t*)block)[1]) << 16);
|
|
||||||
|
|
||||||
std::cerr << "Reading from mem block " << RsUtil::BinToHex((char*)block,8) << ": packet_id=" << slice_packet_id << ", len=" << extralen << ", offset=" << slice_offset << std::endl;
|
std::cerr << "Reading partial packet from mem block " << RsUtil::BinToHex((char*)block,8) << ": packet_id=" << std::hex << slice_packet_id << std::dec << ", len=" << extralen << std::endl;
|
||||||
is_partial_packet = true ;
|
is_partial_packet = true ;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
extralen = getRsItemSize(block) - blen;
|
extralen = getRsItemSize(block) - blen; // old style packet type
|
||||||
|
|
||||||
#ifdef DEBUG_PQISTREAMER
|
#ifdef DEBUG_PQISTREAMER
|
||||||
std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet getRsItemSize(block) = " << getRsItemSize(block) << std::endl ;
|
std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet getRsItemSize(block) = " << getRsItemSize(block) << std::endl ;
|
||||||
@ -884,7 +874,7 @@ continue_packet:
|
|||||||
{
|
{
|
||||||
std::cerr << "Inputing partial packet " << RsUtil::BinToHex((char*)block,8) << std::endl;
|
std::cerr << "Inputing partial packet " << RsUtil::BinToHex((char*)block,8) << std::endl;
|
||||||
|
|
||||||
pkt = addPartialPacket(block,pktlen,slice_offset,slice_packet_id,is_packet_starting,is_packet_ending) ;
|
pkt = addPartialPacket(block,pktlen,slice_packet_id,is_packet_starting,is_packet_ending) ;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
pkt = mRsSerialiser->deserialise(block, &pktlen);
|
pkt = mRsSerialiser->deserialise(block, &pktlen);
|
||||||
@ -927,13 +917,13 @@ continue_packet:
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
RsItem *pqistreamer::addPartialPacket(const void *block,uint32_t len,uint32_t slice_offset,uint32_t slice_packet_id,bool is_packet_starting,bool is_packet_ending)
|
RsItem *pqistreamer::addPartialPacket(const void *block,uint32_t len,uint32_t slice_packet_id,bool is_packet_starting,bool is_packet_ending)
|
||||||
{
|
{
|
||||||
std::cerr << "Receiving partial packet. size=" << len << ", offset=" << slice_offset << ". ID=" << std::hex << slice_packet_id << std::dec << ", starting:" << is_packet_starting << ", ending:" << is_packet_ending ;
|
std::cerr << "Receiving partial packet. size=" << len << ", ID=" << std::hex << slice_packet_id << std::dec << ", starting:" << is_packet_starting << ", ending:" << is_packet_ending ;
|
||||||
|
|
||||||
if(is_packet_starting && is_packet_ending)
|
if(is_packet_starting && is_packet_ending)
|
||||||
{
|
{
|
||||||
std::cerr << "(EE) unexpected situation. Got in addPartialPacket() a full packet both starting and ending" << std::endl;
|
std::cerr << " (EE) unexpected situation: both starting and ending" << std::endl;
|
||||||
return NULL ;
|
return NULL ;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -948,7 +938,7 @@ RsItem *pqistreamer::addPartialPacket(const void *block,uint32_t len,uint32_t sl
|
|||||||
|
|
||||||
if(!is_packet_starting)
|
if(!is_packet_starting)
|
||||||
{
|
{
|
||||||
std::cerr << "(EE) dropping non starting packet that has no record." << std::endl;
|
std::cerr << " (EE) non starting packet has no record. Dropping" << std::endl;
|
||||||
return NULL ;
|
return NULL ;
|
||||||
}
|
}
|
||||||
PartialPacketRecord& rec = mPartialPackets[slice_packet_id] ;
|
PartialPacketRecord& rec = mPartialPackets[slice_packet_id] ;
|
||||||
@ -957,14 +947,14 @@ RsItem *pqistreamer::addPartialPacket(const void *block,uint32_t len,uint32_t sl
|
|||||||
|
|
||||||
if(!rec.mem)
|
if(!rec.mem)
|
||||||
{
|
{
|
||||||
std::cerr << "(EE) Cannot allowcate memory for slice of size " << slice_length << std::endl;
|
std::cerr << " (EE) Cannot allocate memory for slice of size " << slice_length << std::endl;
|
||||||
return NULL ;
|
return NULL ;
|
||||||
}
|
}
|
||||||
|
|
||||||
memcpy(rec.mem, slice_data, slice_length) ; ;
|
memcpy(rec.mem, slice_data, slice_length) ; ;
|
||||||
rec.size = slice_length ;
|
rec.size = slice_length ;
|
||||||
|
|
||||||
std::cerr << " => stored in new record." << std::endl;
|
std::cerr << " => stored in new record (size=" << rec.size << std::endl;
|
||||||
|
|
||||||
return NULL ; // no need to check for ending
|
return NULL ; // no need to check for ending
|
||||||
}
|
}
|
||||||
@ -984,11 +974,11 @@ RsItem *pqistreamer::addPartialPacket(const void *block,uint32_t len,uint32_t sl
|
|||||||
memcpy( &((char*)rec.mem)[rec.size],slice_data,slice_length) ;
|
memcpy( &((char*)rec.mem)[rec.size],slice_data,slice_length) ;
|
||||||
rec.size += slice_length ;
|
rec.size += slice_length ;
|
||||||
|
|
||||||
std::cerr << " => added to existing record " ;
|
std::cerr << " => added to existing record size=" << rec.size ;
|
||||||
|
|
||||||
if(is_packet_ending)
|
if(is_packet_ending)
|
||||||
{
|
{
|
||||||
std::cerr << " => deserialising: mem=" << RsUtil::BinToHex((char*)rec.mem,std::min(20u,rec.size)) << std::endl;
|
std::cerr << " => deserialising: mem=" << RsUtil::BinToHex((char*)rec.mem,std::min(8u,rec.size)) << std::endl;
|
||||||
RsItem *item = mRsSerialiser->deserialise(rec.mem, &rec.size);
|
RsItem *item = mRsSerialiser->deserialise(rec.mem, &rec.size);
|
||||||
|
|
||||||
free(rec.mem) ;
|
free(rec.mem) ;
|
||||||
@ -1232,9 +1222,8 @@ int pqistreamer::locked_gatherStatistics(std::list<RSTrafficClue>& out_lst,std::
|
|||||||
return 1 ;
|
return 1 ;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *pqistreamer::locked_pop_out_data(uint32_t max_slice_size,uint32_t offset_unit,uint32_t& offset,uint32_t& size,bool& starts,bool& ends,uint32_t& packet_id)
|
void *pqistreamer::locked_pop_out_data(uint32_t max_slice_size,uint32_t& size,bool& starts,bool& ends,uint32_t& packet_id)
|
||||||
{
|
{
|
||||||
offset = 0 ;
|
|
||||||
size = 0 ;
|
size = 0 ;
|
||||||
starts = true ;
|
starts = true ;
|
||||||
ends = true ;
|
ends = true ;
|
||||||
|
@ -81,7 +81,7 @@ class pqistreamer: public PQInterface
|
|||||||
virtual int locked_out_queue_size() const ;
|
virtual int locked_out_queue_size() const ;
|
||||||
virtual void locked_clear_out_queue() ;
|
virtual void locked_clear_out_queue() ;
|
||||||
virtual int locked_compute_out_pkt_size() const ;
|
virtual int locked_compute_out_pkt_size() const ;
|
||||||
virtual void *locked_pop_out_data(uint32_t max_slice_size,uint32_t offset_unit,uint32_t& offset,uint32_t& size,bool& starts,bool& ends,uint32_t& packet_id);
|
virtual void *locked_pop_out_data(uint32_t max_slice_size,uint32_t& size,bool& starts,bool& ends,uint32_t& packet_id);
|
||||||
virtual int locked_gatherStatistics(std::list<RSTrafficClue>& outqueue_stats,std::list<RSTrafficClue>& inqueue_stats); // extracting data.
|
virtual int locked_gatherStatistics(std::list<RSTrafficClue>& outqueue_stats,std::list<RSTrafficClue>& inqueue_stats); // extracting data.
|
||||||
|
|
||||||
void updateRates() ;
|
void updateRates() ;
|
||||||
@ -162,7 +162,7 @@ class pqistreamer: public PQInterface
|
|||||||
time_t mStatisticsTimeStamp ;
|
time_t mStatisticsTimeStamp ;
|
||||||
|
|
||||||
void locked_addTrafficClue(const RsItem *pqi, uint32_t pktsize, std::list<RSTrafficClue> &lst);
|
void locked_addTrafficClue(const RsItem *pqi, uint32_t pktsize, std::list<RSTrafficClue> &lst);
|
||||||
RsItem *addPartialPacket(const void *block, uint32_t len, uint32_t slice_offset, uint32_t slice_packet_id,bool packet_starting,bool packet_ending);
|
RsItem *addPartialPacket(const void *block, uint32_t len, uint32_t slice_packet_id,bool packet_starting,bool packet_ending);
|
||||||
|
|
||||||
std::map<uint32_t,PartialPacketRecord> mPartialPackets ;
|
std::map<uint32_t,PartialPacketRecord> mPartialPackets ;
|
||||||
};
|
};
|
||||||
|
Loading…
Reference in New Issue
Block a user