diff --git a/libretroshare/src/pqi/pqistreamer.cc b/libretroshare/src/pqi/pqistreamer.cc index 504cf14cb..e1bac3245 100644 --- a/libretroshare/src/pqi/pqistreamer.cc +++ b/libretroshare/src/pqi/pqistreamer.cc @@ -719,7 +719,9 @@ continue_packet: { // workout how much more to read. - bool is_partial_packet = false ; + bool is_partial_packet = false ; + bool is_packet_starting = (((char*)block)[0] == 0x11) ; + bool is_packet_ending = (((char*)block)[0] == 0x12) ; int extralen =0; int slice_offset = 0 ; @@ -905,7 +907,7 @@ continue_packet: { std::cerr << "Inputing partial packet " << RsUtil::BinToHex((char*)block,8) << std::endl; - pkt = addPartialPacket(block,extralen,slice_offset,slice_packet_id) ; + pkt = addPartialPacket(block,extralen,slice_offset,slice_packet_id,is_packet_starting,is_packet_ending) ; } else pkt = mRsSerialiser->deserialise(block, &pktlen); @@ -951,17 +953,62 @@ continue_packet: return 0; } -RsItem *pqistreamer::addPartialPacket(const void *block,uint32_t len,uint32_t slice_offset,uint32_t slice_packet_id) +RsItem *pqistreamer::addPartialPacket(void *block,uint32_t len,uint32_t slice_offset,uint32_t slice_packet_id,bool is_packet_starting,bool is_packet_ending) { + if(is_packet_starting && is_packet_ending) + { + std::cerr << "(EE) unexpected situation. Got in addPartialPacket() a full packet both starting and ending" << std::endl; + free(block) ; + return NULL ; + } + std::map::iterator it = mPartialPackets.find(slice_packet_id) ; - + if(it == mPartialPackets.end()) { - // make sure we really have starting packet. Otherwise this is an error. + // make sure we really have a starting packet. Otherwise this is an error. + + if(!is_packet_starting) + { + std::cerr << "(EE) dropping non starting packet that has no record." << std::endl; + free(block) ; + return NULL ; + } + + mPartialPackets[slice_packet_id].mem = block ; + mPartialPackets[slice_packet_id].size = len ; + + return NULL ; // no need to check for ending } else { - // make sure this is a continuing packet, otherwise this is an error. + if(is_packet_starting) + { + std::cerr << "(WW) dropping unfinished existing packet that gets to be replaced by new starting packet." << std::endl; + free(it->second.mem) ; + it->second.mem = block ; + it->second.size = len ; + + return NULL ; + } + // make sure this is a continuing packet, otherwise this is an error. + + it->second.mem = realloc(it->second.mem, it->second.size + len) ; + memcpy( &((char*)it->second.mem)[it->second.size],block,len) ; + it->second.size += len ; + free(block) ; + + if(is_packet_ending) + { + RsItem *item = mRsSerialiser->deserialise(it->second.mem, &it->second.size); + + free(it->second.mem) ; + mPartialPackets.erase(it) ; + + return item ; + } + else + return NULL ; } } diff --git a/libretroshare/src/pqi/pqistreamer.h b/libretroshare/src/pqi/pqistreamer.h index 47c8540df..79c5fbefc 100644 --- a/libretroshare/src/pqi/pqistreamer.h +++ b/libretroshare/src/pqi/pqistreamer.h @@ -40,6 +40,12 @@ // The interface does not handle connection, just communication. // possible bioflags: BIN_FLAGS_NO_CLOSE | BIN_FLAGS_NO_DELETE +struct PartialPacketRecord +{ + void *mem ; + uint32_t size ; +}; + class pqistreamer: public PQInterface { public: @@ -156,7 +162,9 @@ class pqistreamer: public PQInterface time_t mStatisticsTimeStamp ; void locked_addTrafficClue(const RsItem *pqi, uint32_t pktsize, std::list &lst); - RsItem *addPartialPacket(const void *block, uint32_t len, uint32_t slice_offset, uint32_t slice_packet_id); + RsItem *addPartialPacket(void *block, uint32_t len, uint32_t slice_offset, uint32_t slice_packet_id,bool packet_starting,bool packet_ending); + + std::map mPartialPackets ; }; #endif //MRK_PQI_STREAMER_HEADER