diff --git a/libretroshare/src/pqi/pqistreamer.cc b/libretroshare/src/pqi/pqistreamer.cc index c04c2ea9a..42b00bda3 100644 --- a/libretroshare/src/pqi/pqistreamer.cc +++ b/libretroshare/src/pqi/pqistreamer.cc @@ -624,335 +624,307 @@ int pqistreamer::handleoutgoing_locked() */ int pqistreamer::handleincoming_locked() { - int readbytes = 0; - static const int max_failed_read_attempts = 2000 ; + int readbytes = 0; + static const int max_failed_read_attempts = 2000 ; #ifdef DEBUG_PQISTREAMER - pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::handleincoming_locked()"); + pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::handleincoming_locked()"); #endif - if(!(mBio->isactive())) - { - mReading_state = reading_state_initial ; - free_rpend_locked(); - return 0; - } + if(!(mBio->isactive())) + { + mReading_state = reading_state_initial ; + free_rpend_locked(); + return 0; + } else - allocate_rpend_locked(); + allocate_rpend_locked(); - // enough space to read any packet. - int maxlen = mPkt_rpend_size; - void *block = mPkt_rpending; + // enough space to read any packet. + int maxlen = mPkt_rpend_size; + void *block = mPkt_rpending; - // initial read size: basic packet. - int blen = getRsPktBaseSize(); // this is valid for both packet slices and normal un-sliced packets (same header size) + // initial read size: basic packet. + int blen = getRsPktBaseSize(); // this is valid for both packet slices and normal un-sliced packets (same header size) - int maxin = inAllowedBytes_locked(); + int maxin = inAllowedBytes_locked(); #ifdef DEBUG_PQISTREAMER - std::cerr << "[" << (void*)pthread_self() << "] " << "reading state = " << mReading_state << std::endl ; + std::cerr << "[" << (void*)pthread_self() << "] " << "reading state = " << mReading_state << std::endl ; #endif - switch(mReading_state) - { - case reading_state_initial: /*std::cerr << "jumping to start" << std::endl; */ goto start_packet_read ; - case reading_state_packet_started: /*std::cerr << "jumping to middle" << std::endl;*/ goto continue_packet ; - } + switch(mReading_state) + { + case reading_state_initial: /*std::cerr << "jumping to start" << std::endl; */ goto start_packet_read ; + case reading_state_packet_started: /*std::cerr << "jumping to middle" << std::endl;*/ goto continue_packet ; + } start_packet_read: - { // scope to ensure variable visibility - // read the basic block (minimum packet size) - int tmplen; + { // scope to ensure variable visibility + // read the basic block (minimum packet size) + int tmplen; #ifdef DEBUG_PQISTREAMER - std::cerr << "[" << (void*)pthread_self() << "] " << "starting packet" << std::endl ; + std::cerr << "[" << (void*)pthread_self() << "] " << "starting packet" << std::endl ; #endif - memset(block,0,blen) ; // reset the block, to avoid uninitialized memory reads. + memset(block,0,blen) ; // reset the block, to avoid uninitialized memory reads. - if (blen != (tmplen = mBio->readdata(block, blen))) - { - pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, "pqistreamer::handleincoming() Didn't read BasePkt!"); + if (blen != (tmplen = mBio->readdata(block, blen))) + { + pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, "pqistreamer::handleincoming() Didn't read BasePkt!"); - // error.... (either blocked or failure) - if (tmplen == 0) - { + // error.... (either blocked or failure) + if (tmplen == 0) + { #ifdef DEBUG_PQISTREAMER - // most likely blocked! - pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, "pqistreamer::handleincoming() read blocked"); - std::cerr << "[" << (void*)pthread_self() << "] " << "given up 1" << std::endl ; + // most likely blocked! + pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, "pqistreamer::handleincoming() read blocked"); + std::cerr << "[" << (void*)pthread_self() << "] " << "given up 1" << std::endl ; #endif - return 0; - } - else if (tmplen < 0) - { - // Most likely it is that the packet is pending but could not be read by pqissl because of stream flow. - // So we return without an error, and leave the machine state in 'start_read'. - // - //pqioutput(PQL_WARNING, pqistreamerzone, "pqistreamer::handleincoming() Error in bio read"); + return 0; + } + else if (tmplen < 0) + { + // Most likely it is that the packet is pending but could not be read by pqissl because of stream flow. + // So we return without an error, and leave the machine state in 'start_read'. + // + //pqioutput(PQL_WARNING, pqistreamerzone, "pqistreamer::handleincoming() Error in bio read"); #ifdef DEBUG_PQISTREAMER - std::cerr << "[" << (void*)pthread_self() << "] " << "given up 2, state = " << mReading_state << std::endl ; + std::cerr << "[" << (void*)pthread_self() << "] " << "given up 2, state = " << mReading_state << std::endl ; #endif - return 0; - } - else // tmplen > 0 - { - // strange case....This should never happen as partial reads are handled by pqissl below. + return 0; + } + else // tmplen > 0 + { + // strange case....This should never happen as partial reads are handled by pqissl below. #ifdef DEBUG_PQISTREAMER - std::string out = "pqistreamer::handleincoming() Incomplete "; - rs_sprintf_append(out, "(Strange) read of %d bytes", tmplen); - pqioutput(PQL_ALERT, pqistreamerzone, out); + std::string out = "pqistreamer::handleincoming() Incomplete "; + rs_sprintf_append(out, "(Strange) read of %d bytes", tmplen); + pqioutput(PQL_ALERT, pqistreamerzone, out); - std::cerr << "[" << (void*)pthread_self() << "] " << "given up 3" << std::endl ; + std::cerr << "[" << (void*)pthread_self() << "] " << "given up 3" << std::endl ; #endif - return -1; - } - } + return -1; + } + } #ifdef DEBUG_PQISTREAMER - std::cerr << "[" << (void*)pthread_self() << "] " << "block 0 : " << (int)(((unsigned char*)block)[0]) << " " << (int)(((unsigned char*)block)[1]) << " " << (int)(((unsigned char*)block)[2]) << " " - << (int)(((unsigned char*)block)[3]) << " " - << (int)(((unsigned char*)block)[4]) << " " - << (int)(((unsigned char*)block)[5]) << " " - << (int)(((unsigned char*)block)[6]) << " " - << (int)(((unsigned char*)block)[7]) << " " << std::endl ; + std::cerr << "[" << (void*)pthread_self() << "] " << "block 0 : " << RsUtil::BinToHex(block,8) << std::endl; #endif - readbytes += blen; - mReading_state = reading_state_packet_started ; - mFailed_read_attempts = 0 ; // reset failed read, as the packet has been totally read. - } + readbytes += blen; + mReading_state = reading_state_packet_started ; + mFailed_read_attempts = 0 ; // reset failed read, as the packet has been totally read. + } continue_packet: - { - // workout how much more to read. - - bool is_partial_packet = false ; - bool is_packet_starting = (((char*)block)[0] == 0x11) ; - bool is_packet_ending = (((char*)block)[0] == 0x12) ; - - uint32_t extralen =0; - uint32_t slice_offset = 0 ; - uint32_t slice_packet_id =0; - - if( ((char*)block)[0] == 0x10 || ((char*)block)[0] == 0x11 || ((char*)block)[0] == 0x12) - { - extralen = (uint32_t(((uint8_t*)block)[6]) << 8) + (uint32_t(((uint8_t*)block)[7])); - slice_offset = (uint32_t(((uint8_t*)block)[5]) << 4) + (uint32_t(((uint8_t*)block)[4]) << 12); - slice_packet_id = (uint32_t(((uint8_t*)block)[3]) << 0) + (uint32_t(((uint8_t*)block)[2]) << 8) + (uint32_t(((uint8_t*)block)[1]) << 16); - - std::cerr << "Reading from mem block " << RsUtil::BinToHex((char*)block,8) << ": packet_id=" << slice_packet_id << ", len=" << extralen << ", offset=" << slice_offset << std::endl; - is_partial_packet = true ; - } - else - extralen = getRsItemSize(block) - blen; + { + // workout how much more to read. + + bool is_partial_packet = false ; + bool is_packet_starting = (((char*)block)[0] == 0x11) ; + bool is_packet_ending = (((char*)block)[0] == 0x12) ; + + uint32_t extralen =0; + uint32_t slice_offset = 0 ; + uint32_t slice_packet_id =0; + + if( ((char*)block)[0] == 0x10 || ((char*)block)[0] == 0x11 || ((char*)block)[0] == 0x12) + { + extralen = (uint32_t(((uint8_t*)block)[6]) << 8) + (uint32_t(((uint8_t*)block)[7])); + slice_offset = (uint32_t(((uint8_t*)block)[5]) << 4) + (uint32_t(((uint8_t*)block)[4]) << 12); + slice_packet_id = (uint32_t(((uint8_t*)block)[3]) << 0) + (uint32_t(((uint8_t*)block)[2]) << 8) + (uint32_t(((uint8_t*)block)[1]) << 16); + + std::cerr << "Reading from mem block " << RsUtil::BinToHex((char*)block,8) << ": packet_id=" << slice_packet_id << ", len=" << extralen << ", offset=" << slice_offset << std::endl; + is_partial_packet = true ; + } + else + extralen = getRsItemSize(block) - blen; #ifdef DEBUG_PQISTREAMER - std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet getRsItemSize(block) = " << getRsItemSize(block) << std::endl ; - std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet extralen = " << extralen << std::endl ; + std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet getRsItemSize(block) = " << getRsItemSize(block) << std::endl ; + std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet extralen = " << extralen << std::endl ; - std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet state=" << mReading_state << std::endl ; - std::cerr << "[" << (void*)pthread_self() << "] " << "block 1 : " << (int)(((unsigned char*)block)[0]) << " " << (int)(((unsigned char*)block)[1]) << " " << (int)(((unsigned char*)block)[2]) << " " << (int)(((unsigned char*)block)[3]) << " " - << (int)(((unsigned char*)block)[4]) << " " - << (int)(((unsigned char*)block)[5]) << " " - << (int)(((unsigned char*)block)[6]) << " " - << (int)(((unsigned char*)block)[7]) << " " << std::endl ; + std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet state=" << mReading_state << std::endl ; + std::cerr << "[" << (void*)pthread_self() << "] " << "block 1 : " << RsUtil::BinToHex(block,8) << std::endl; #endif - if (extralen > maxlen - blen) - { - pqioutput(PQL_ALERT, pqistreamerzone, "ERROR: Read Packet too Big!"); + if (extralen > maxlen - blen) + { + pqioutput(PQL_ALERT, pqistreamerzone, "ERROR: Read Packet too Big!"); - p3Notify *notify = RsServer::notify(); - if (notify) - { - std::string title = - "Warning: Bad Packet Read"; + p3Notify *notify = RsServer::notify(); + if (notify) + { + std::string title = + "Warning: Bad Packet Read"; - std::string msg; - msg = " **** WARNING **** \n"; - msg += "Retroshare has caught a BAD Packet Read"; - msg += "\n"; - msg += "This is normally caused by connecting to an"; - msg += " OLD version of Retroshare"; - msg += "\n"; - rs_sprintf_append(msg, "(M:%d B:%d E:%d)\n", maxlen, blen, extralen); - msg += "\n"; - rs_sprintf_append(msg, "block = %d %d %d %d %d %d %d %d\n", - (int)(((unsigned char*)block)[0]), - (int)(((unsigned char*)block)[1]), - (int)(((unsigned char*)block)[2]), - (int)(((unsigned char*)block)[3]), - (int)(((unsigned char*)block)[4]), - (int)(((unsigned char*)block)[5]), - (int)(((unsigned char*)block)[6]), - (int)(((unsigned char*)block)[7])) ; - msg += "\n"; - msg += "Please get your friends to upgrade to the latest version"; - msg += "\n"; - msg += "\n"; - msg += "If you are sure the error was not caused by an old version"; - msg += "\n"; - msg += "Please report the problem to Retroshare's developers"; - msg += "\n"; + std::string msg; + msg = " **** WARNING **** \n"; + msg += "Retroshare has caught a BAD Packet Read"; + msg += "\n"; + msg += "This is normally caused by connecting to an"; + msg += " OLD version of Retroshare"; + msg += "\n"; + rs_sprintf_append(msg, "(M:%d B:%d E:%d)\n", maxlen, blen, extralen); + msg += "\n"; + msg += "block = " ; + msg += RsUtil::BinToHex((char*)block,8); - notify->AddLogMessage(0, RS_SYS_WARNING, title, msg); + msg += "\n"; + msg += "Please get your friends to upgrade to the latest version"; + msg += "\n"; + msg += "\n"; + msg += "If you are sure the error was not caused by an old version"; + msg += "\n"; + msg += "Please report the problem to Retroshare's developers"; + msg += "\n"; - std::cerr << "pqistreamer::handle_incoming() ERROR: Read Packet too Big" << std::endl; - std::cerr << msg; - std::cerr << std::endl; + notify->AddLogMessage(0, RS_SYS_WARNING, title, msg); - } - mBio->close(); - mReading_state = reading_state_initial ; // restart at state 1. - mFailed_read_attempts = 0 ; - return -1; + std::cerr << "pqistreamer::handle_incoming() ERROR: Read Packet too Big" << std::endl; + std::cerr << msg; + std::cerr << std::endl; - // Used to exit now! exit(1); - } + } + mBio->close(); + mReading_state = reading_state_initial ; // restart at state 1. + mFailed_read_attempts = 0 ; + return -1; - if (extralen > 0) - { - void *extradata = (void *) (((char *) block) + blen); - int tmplen ; + // Used to exit now! exit(1); + } - // Don't reset the block now! If pqissl is in the middle of a multiple-chunk - // packet (larger than 16384 bytes), and pqistreamer jumped directly yo - // continue_packet:, then readdata is going to write after the beginning of - // extradata, yet not exactly at start -> the start of the packet would be wiped out. - // - // so, don't do that: - // memset( extradata,0,extralen ) ; + if (extralen > 0) + { + void *extradata = (void *) (((char *) block) + blen); + int tmplen ; - if (extralen != (tmplen = mBio->readdata(extradata, extralen))) - { + // Don't reset the block now! If pqissl is in the middle of a multiple-chunk + // packet (larger than 16384 bytes), and pqistreamer jumped directly yo + // continue_packet:, then readdata is going to write after the beginning of + // extradata, yet not exactly at start -> the start of the packet would be wiped out. + // + // so, don't do that: + // memset( extradata,0,extralen ) ; + + if (extralen != (tmplen = mBio->readdata(extradata, extralen))) + { #ifdef DEBUG_PQISTREAMER - if(tmplen > 0) - std::cerr << "[" << (void*)pthread_self() << "] " << "Incomplete packet read ! This is a real problem ;-)" << std::endl ; + if(tmplen > 0) + std::cerr << "[" << (void*)pthread_self() << "] " << "Incomplete packet read ! This is a real problem ;-)" << std::endl ; #endif - if(++mFailed_read_attempts > max_failed_read_attempts) - { - std::string out; - rs_sprintf(out, "Error Completing Read (read %d/%d)", tmplen, extralen); - std::cerr << out << std::endl ; - pqioutput(PQL_ALERT, pqistreamerzone, out); + if(++mFailed_read_attempts > max_failed_read_attempts) + { + std::string out; + rs_sprintf(out, "Error Completing Read (read %d/%d)", tmplen, extralen); + std::cerr << out << std::endl ; + pqioutput(PQL_ALERT, pqistreamerzone, out); - p3Notify *notify = RsServer::notify(); - if (notify) - { - std::string title = "Warning: Error Completing Read"; + p3Notify *notify = RsServer::notify(); + if (notify) + { + std::string title = "Warning: Error Completing Read"; - std::string msgout; - msgout = " **** WARNING **** \n"; - msgout += "Retroshare has experienced an unexpected Read ERROR"; - msgout += "\n"; - rs_sprintf_append(msgout, "(M:%d B:%d E:%d R:%d)\n", maxlen, blen, extralen, tmplen); - msgout += "\n"; - msgout += "Note: this error might as well happen (rarely) when a peer disconnects in between a transmission of a large packet.\n"; - msgout += "If it happens manny time, please contact the developers, and send them these numbers:"; - msgout += "\n"; + std::string msgout; + msgout = " **** WARNING **** \n"; + msgout += "Retroshare has experienced an unexpected Read ERROR"; + msgout += "\n"; + rs_sprintf_append(msgout, "(M:%d B:%d E:%d R:%d)\n", maxlen, blen, extralen, tmplen); + msgout += "\n"; + msgout += "Note: this error might as well happen (rarely) when a peer disconnects in between a transmission of a large packet.\n"; + msgout += "If it happens manny time, please contact the developers, and send them these numbers:"; + msgout += "\n"; - rs_sprintf_append(msgout, "block = %d %d %d %d %d %d %d %d\n", - (int)(((unsigned char*)block)[0]), - (int)(((unsigned char*)block)[1]), - (int)(((unsigned char*)block)[2]), - (int)(((unsigned char*)block)[3]), - (int)(((unsigned char*)block)[4]), - (int)(((unsigned char*)block)[5]), - (int)(((unsigned char*)block)[6]), - (int)(((unsigned char*)block)[7])); + msgout += "block = " ; + msgout += RsUtil::BinToHex((char*)block,8) + "\n" ; - //notify->AddSysMessage(0, RS_SYS_WARNING, title, msgout.str()); + std::cerr << msgout << std::endl; + } - std::cerr << msgout << std::endl; - } - - mBio->close(); - mReading_state = reading_state_initial ; // restart at state 1. - mFailed_read_attempts = 0 ; - return -1; - } - else - { + mBio->close(); + mReading_state = reading_state_initial ; // restart at state 1. + mFailed_read_attempts = 0 ; + return -1; + } + else + { #ifdef DEBUG_PQISTREAMER - std::cerr << "[" << (void*)pthread_self() << "] " << "given up 5, state = " << mReading_state << std::endl ; + std::cerr << "[" << (void*)pthread_self() << "] " << "given up 5, state = " << mReading_state << std::endl ; #endif - return 0 ; // this is just a SSL_WANT_READ error. Don't panic, we'll re-try the read soon. - // we assume readdata() returned either -1 or the complete read size. - } - } + return 0 ; // this is just a SSL_WANT_READ error. Don't panic, we'll re-try the read soon. + // we assume readdata() returned either -1 or the complete read size. + } + } #ifdef DEBUG_PQISTREAMER - std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet state=" << mReading_state << std::endl ; - std::cerr << "[" << (void*)pthread_self() << "] " << "block 2 : " << (int)(((unsigned char*)extradata)[0]) << " " << (int)(((unsigned char*)extradata)[1]) << " " << (int)(((unsigned char*)extradata)[2]) << " " << (int)(((unsigned char*)extradata)[3]) << " " - << (int)(((unsigned char*)extradata)[4]) << " " - << (int)(((unsigned char*)extradata)[5]) << " " - << (int)(((unsigned char*)extradata)[6]) << " " - << (int)(((unsigned char*)extradata)[7]) << " " << std::endl ; + std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet state=" << mReading_state << std::endl ; + std::cerr << "[" << (void*)pthread_self() << "] " << "block 2 : " << RsUtil::BinToHex(extradata,8) << std::endl; #endif - mFailed_read_attempts = 0 ; - readbytes += extralen; - } + mFailed_read_attempts = 0 ; + readbytes += extralen; + } - // create packet, based on header. + // create packet, based on header. #ifdef DEBUG_PQISTREAMER - { - std::string out; - rs_sprintf(out, "Read Data Block -> Incoming Pkt(%d)", blen + extralen); - //std::cerr << out ; - pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out); - } + { + std::string out; + rs_sprintf(out, "Read Data Block -> Incoming Pkt(%d)", blen + extralen); + //std::cerr << out ; + pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out); + } #endif - // std::cerr << "Deserializing packet of size " << pktlen <deserialise(block, &pktlen); - - if ((pkt != NULL) && (0 < handleincomingitem_locked(pkt,pktlen))) - { -#ifdef DEBUG_PQISTREAMER - pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, "Successfully Read a Packet!"); -#endif - inReadBytes_locked(pktlen); // only count deserialised packets, because that's what is actually been transfered. - } - else if (!is_partial_packet) - { -#ifdef DEBUG_PQISTREAMER - pqioutput(PQL_ALERT, pqistreamerzone, "Failed to handle Packet!"); -#endif - std::cerr << "Incoming Packet could not be deserialised:" << std::endl; - std::cerr << " Incoming peer id: " << PeerId() << std::endl; - if(pktlen >= 8) - std::cerr << " Packet header : " << RsUtil::BinToHex((unsigned char*)block,8) << std::endl; - if(pktlen > 8) - std::cerr << " Packet data : " << RsUtil::BinToHex((unsigned char*)block+8,std::min(50u,pktlen-8)) << ((pktlen>58)?"...":"") << std::endl; - } + RsItem *pkt ; + std::cerr << "Got packet with header " << RsUtil::BinToHex((char*)block,8) << std::endl; - mReading_state = reading_state_initial ; // restart at state 1. - mFailed_read_attempts = 0 ; // reset failed read, as the packet has been totally read. - } + if(is_partial_packet) + { + std::cerr << "Inputing partial packet " << RsUtil::BinToHex((char*)block,8) << std::endl; - if(maxin > readbytes && mBio->moretoread(0)) - goto start_packet_read ; + pkt = addPartialPacket(block,pktlen,slice_offset,slice_packet_id,is_packet_starting,is_packet_ending) ; + } + else + pkt = mRsSerialiser->deserialise(block, &pktlen); + + if ((pkt != NULL) && (0 < handleincomingitem_locked(pkt,pktlen))) + { +#ifdef DEBUG_PQISTREAMER + pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, "Successfully Read a Packet!"); +#endif + inReadBytes_locked(pktlen); // only count deserialised packets, because that's what is actually been transfered. + } + else if (!is_partial_packet) + { +#ifdef DEBUG_PQISTREAMER + pqioutput(PQL_ALERT, pqistreamerzone, "Failed to handle Packet!"); +#endif + std::cerr << "Incoming Packet could not be deserialised:" << std::endl; + std::cerr << " Incoming peer id: " << PeerId() << std::endl; + if(pktlen >= 8) + std::cerr << " Packet header : " << RsUtil::BinToHex((unsigned char*)block,8) << std::endl; + if(pktlen > 8) + std::cerr << " Packet data : " << RsUtil::BinToHex((unsigned char*)block+8,std::min(50u,pktlen-8)) << ((pktlen>58)?"...":"") << std::endl; + } + + mReading_state = reading_state_initial ; // restart at state 1. + mFailed_read_attempts = 0 ; // reset failed read, as the packet has been totally read. + } + + if(maxin > readbytes && mBio->moretoread(0)) + goto start_packet_read ; #ifdef DEBUG_TRANSFERS - if (readbytes >= maxin) - { - std::cerr << "pqistreamer::handleincoming() Stopped reading as readbytes >= maxin. Read " << readbytes << " bytes "; - std::cerr << std::endl; - } + if (readbytes >= maxin) + { + std::cerr << "pqistreamer::handleincoming() Stopped reading as readbytes >= maxin. Read " << readbytes << " bytes "; + std::cerr << std::endl; + } #endif - return 0; + return 0; } RsItem *pqistreamer::addPartialPacket(const void *block,uint32_t len,uint32_t slice_offset,uint32_t slice_packet_id,bool is_packet_starting,bool is_packet_ending)