added packet reconstruction and deserialising

This commit is contained in:
csoler 2016-04-24 13:43:34 -04:00
parent cad8c89746
commit a2c11f97c2
2 changed files with 62 additions and 7 deletions

View File

@ -720,6 +720,8 @@ continue_packet:
// workout how much more to read.
bool is_partial_packet = false ;
bool is_packet_starting = (((char*)block)[0] == 0x11) ;
bool is_packet_ending = (((char*)block)[0] == 0x12) ;
int extralen =0;
int slice_offset = 0 ;
@ -905,7 +907,7 @@ continue_packet:
{
std::cerr << "Inputing partial packet " << RsUtil::BinToHex((char*)block,8) << std::endl;
pkt = addPartialPacket(block,extralen,slice_offset,slice_packet_id) ;
pkt = addPartialPacket(block,extralen,slice_offset,slice_packet_id,is_packet_starting,is_packet_ending) ;
}
else
pkt = mRsSerialiser->deserialise(block, &pktlen);
@ -951,17 +953,62 @@ continue_packet:
return 0;
}
RsItem *pqistreamer::addPartialPacket(const void *block,uint32_t len,uint32_t slice_offset,uint32_t slice_packet_id)
RsItem *pqistreamer::addPartialPacket(void *block,uint32_t len,uint32_t slice_offset,uint32_t slice_packet_id,bool is_packet_starting,bool is_packet_ending)
{
if(is_packet_starting && is_packet_ending)
{
std::cerr << "(EE) unexpected situation. Got in addPartialPacket() a full packet both starting and ending" << std::endl;
free(block) ;
return NULL ;
}
std::map<uint32_t,PartialPacketRecord>::iterator it = mPartialPackets.find(slice_packet_id) ;
if(it == mPartialPackets.end())
{
// make sure we really have starting packet. Otherwise this is an error.
// make sure we really have a starting packet. Otherwise this is an error.
if(!is_packet_starting)
{
std::cerr << "(EE) dropping non starting packet that has no record." << std::endl;
free(block) ;
return NULL ;
}
mPartialPackets[slice_packet_id].mem = block ;
mPartialPackets[slice_packet_id].size = len ;
return NULL ; // no need to check for ending
}
else
{
if(is_packet_starting)
{
std::cerr << "(WW) dropping unfinished existing packet that gets to be replaced by new starting packet." << std::endl;
free(it->second.mem) ;
it->second.mem = block ;
it->second.size = len ;
return NULL ;
}
// make sure this is a continuing packet, otherwise this is an error.
it->second.mem = realloc(it->second.mem, it->second.size + len) ;
memcpy( &((char*)it->second.mem)[it->second.size],block,len) ;
it->second.size += len ;
free(block) ;
if(is_packet_ending)
{
RsItem *item = mRsSerialiser->deserialise(it->second.mem, &it->second.size);
free(it->second.mem) ;
mPartialPackets.erase(it) ;
return item ;
}
else
return NULL ;
}
}

View File

@ -40,6 +40,12 @@
// The interface does not handle connection, just communication.
// possible bioflags: BIN_FLAGS_NO_CLOSE | BIN_FLAGS_NO_DELETE
struct PartialPacketRecord
{
void *mem ;
uint32_t size ;
};
class pqistreamer: public PQInterface
{
public:
@ -156,7 +162,9 @@ class pqistreamer: public PQInterface
time_t mStatisticsTimeStamp ;
void locked_addTrafficClue(const RsItem *pqi, uint32_t pktsize, std::list<RSTrafficClue> &lst);
RsItem *addPartialPacket(const void *block, uint32_t len, uint32_t slice_offset, uint32_t slice_packet_id);
RsItem *addPartialPacket(void *block, uint32_t len, uint32_t slice_offset, uint32_t slice_packet_id,bool packet_starting,bool packet_ending);
std::map<uint32_t,PartialPacketRecord> mPartialPackets ;
};
#endif //MRK_PQI_STREAMER_HEADER